1#![deny(unsafe_code)]
2
3use std::convert::From as _;
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::channel::{mpsc, oneshot};
10use futures::{SinkExt, StreamExt};
11use serde_json::{self, json};
12
13use rmqtt::{
14 fitter::Fitter,
15 hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
16 plugin::{PackageInfo, Plugin},
17 register,
18 session::Session,
19 types::DisconnectInfo,
20 types::{ClientId, From, Publish, SessionSubMap, SessionSubs, TimestampMillis},
21 utils::timestamp_millis,
22 Result,
23};
24
25use rmqtt_storage::{init_db, DefaultStorageDB, List, Map, StorageType};
26
27use config::PluginConfig;
28use rmqtt::context::ServerContext;
29use rmqtt::inflight::OutInflightMessage;
30use rmqtt::macros::Plugin;
31use rmqtt::session::SessionState;
32use session::{Basic, StorageSessionManager, StoredSessionInfo, StoredSessionInfos};
33use session::{StoredKey, BASIC, DISCONNECT_INFO, INFLIGHT_MESSAGES, LAST_TIME, SESSION_SUB_MAP};
34
35mod config;
36mod session;
37
38enum RebuildChanType {
39 Session(Session, Duration),
40 Done(oneshot::Sender<()>),
41}
42
43type OfflineMessageOptionType = Option<(ClientId, From, Publish)>;
44
45register!(StoragePlugin::new);
46
47#[derive(Plugin)]
48struct StoragePlugin {
49 scx: ServerContext,
50 cfg: Arc<PluginConfig>,
51 storage_db: DefaultStorageDB,
52 stored_session_infos: StoredSessionInfos,
53 register: Box<dyn Register>,
54 session_mgr: StorageSessionManager,
55 rebuild_tx: mpsc::Sender<RebuildChanType>,
56}
57
58impl StoragePlugin {
59 #[inline]
60 async fn new<S: Into<String>>(scx: ServerContext, name: S) -> Result<Self> {
61 let name = name.into();
62 let mut cfg = scx.plugins.read_config_default::<PluginConfig>(&name)?;
63 match cfg.storage.typ {
64 #[cfg(feature = "sled")]
65 StorageType::Sled => {
66 cfg.storage.sled.path =
67 cfg.storage.sled.path.replace("{node}", &format!("{}", scx.node.id()));
68 }
69 #[cfg(feature = "redis")]
70 StorageType::Redis => {
71 cfg.storage.redis.prefix =
72 cfg.storage.redis.prefix.replace("{node}", &format!("{}", scx.node.id()));
73 }
74 #[cfg(feature = "redis-cluster")]
75 StorageType::RedisCluster => {
76 cfg.storage.redis_cluster.prefix =
77 cfg.storage.redis_cluster.prefix.replace("{node}", &format!("{}", scx.node.id()));
78 }
79 }
80
81 log::info!("{name} StoragePlugin cfg: {cfg:?}");
82
83 let storage_db = match init_db(&cfg.storage).await {
84 Err(e) => {
85 log::error!("{name} init storage db error, {e}");
86 return Err(e);
87 }
88 Ok(db) => db,
89 };
90
91 let stored_session_infos = StoredSessionInfos::new();
92
93 let register = scx.extends.hook_mgr().register();
94 let session_mgr = StorageSessionManager::new(storage_db.clone(), stored_session_infos.clone());
95
96 let cfg = Arc::new(cfg);
97 let rebuild_tx = Self::start_local_runtime(scx.clone());
98 Ok(Self { scx, cfg, storage_db, stored_session_infos, register, session_mgr, rebuild_tx })
99 }
100
101 async fn load_offline_session_infos(&mut self) -> Result<()> {
102 log::info!("{:?} load_offline_session_infos ...", self.name());
103 let storage_db = self.storage_db.clone();
104 let mut iter_storage_db = storage_db.clone();
105 let mut map_iter = iter_storage_db.map_iter().await?;
107 while let Some(m) = map_iter.next().await {
108 match m {
109 Ok(m) => {
110 let id_key = StoredKey::from(map_stored_key_to_id_bytes(m.name()).to_vec());
111 log::debug!("map_stored_key: {id_key:?}");
112 let basic = match m.get::<_, Basic>(BASIC).await {
113 Err(e) => {
114 log::warn!("{id_key:?} load offline session basic info error, {e:?}");
115 if let Err(e) = storage_db.map_remove(m.name()).await {
116 log::warn!("{id_key:?} remove offline session info error, {e:?}");
117 }
118 continue;
119 }
120 Ok(None) => {
121 log::warn!("{id_key:?} offline session basic info is None");
122 if let Err(e) = storage_db.map_remove(m.name()).await {
123 log::warn!("{id_key:?} remove offline session info error, {e:?}");
124 }
125 continue;
126 }
127 Ok(Some(basic)) => basic,
128 };
129
130 log::debug!("basic: {basic:?}");
131 log::debug!("map key: {id_key:?}");
132 let mut s_info = StoredSessionInfo::from(id_key.clone(), basic);
133
134 match m.get::<_, TimestampMillis>(LAST_TIME).await {
135 Ok(Some(last_time)) => {
136 log::debug!("last_time: {last_time:?}");
137 s_info.set_last_time(last_time);
138 }
139 Ok(None) => {}
140 Err(e) => {
141 log::warn!("{id_key:?} load offline session last time error, {e:?}");
142 }
143 }
144
145 match m.get::<_, SessionSubMap>(SESSION_SUB_MAP).await {
146 Ok(Some(subs)) => {
147 log::debug!("subs: {subs:?}");
148 s_info.set_subs(subs);
149 }
150 Ok(None) => {}
151 Err(e) => {
152 log::warn!("{id_key:?} load offline session subscription info error, {e:?}");
153 }
154 }
155
156 match m.get::<_, DisconnectInfo>(DISCONNECT_INFO).await {
157 Ok(Some(disc_info)) => {
158 log::debug!("disc_info: {disc_info:?}");
159 s_info.set_disconnect_info(disc_info);
160 }
161 Ok(None) => {}
162 Err(e) => {
163 log::warn!("{id_key:?} load offline session disconnect info error, {e:?}");
164 }
165 }
166
167 match m.get::<_, Vec<OutInflightMessage>>(INFLIGHT_MESSAGES).await {
168 Ok(Some(inflights)) => {
169 log::debug!("inflights len: {:?}", inflights.len());
170 s_info.inflight_messages = inflights;
171 }
172 Ok(None) => {}
173 Err(e) => {
174 log::warn!("{id_key:?} load offline session inflight messages error, {e:?}");
175 }
176 }
177
178 self.stored_session_infos.add(s_info);
179 }
180 Err(e) => {
181 log::warn!("load offline session info error, {e:?}");
182 }
183 }
184 }
185 drop(map_iter);
186
187 let mut list_iter = iter_storage_db.list_iter().await?;
188 while let Some(l) = list_iter.next().await {
189 match l {
190 Ok(l) => {
191 let id_key = StoredKey::from(list_stored_key_to_id_bytes(l.name()).to_vec());
192 log::debug!("list_stored_key, id_key: {id_key:?}");
193 match l.all::<OfflineMessageOptionType>().await {
194 Ok(offline_msgs) => {
195 log::debug!("{:?} offline_msgs len: {}", id_key, offline_msgs.len(),);
196 let ok =
197 self.stored_session_infos.set_offline_messages(id_key.clone(), offline_msgs);
198 log::debug!("{id_key:?} stored_session_infos, set_offline_messages res: {ok}");
199 if !ok {
200 if let Err(e) = storage_db.list_remove(l.name()).await {
201 log::warn!("{id_key:?} remove offline messages error, {e:?}");
202 }
203 }
204 }
205 Err(e) => {
206 log::warn!("{id_key:?} load offline messages error, {e:?}");
207 if let Err(e) = storage_db.list_remove(l.name()).await {
208 log::warn!("{id_key:?} remove offline messages error, {e:?}");
209 }
210 }
211 }
212 }
213 Err(e) => {
214 log::warn!("load offline messages error, {e:?}");
215 }
216 }
217 }
218 drop(list_iter);
219
220 for removed_key in self.stored_session_infos.retain_latests() {
221 storage_db.map_remove(make_map_stored_key(removed_key.as_ref())).await?;
222 storage_db.list_remove(make_list_stored_key(removed_key.as_ref())).await?;
223 }
224 log::info!("stored_session_infos len: {:?}", self.stored_session_infos.len());
225
226 Ok(())
227 }
228
229 fn start_local_runtime(scx: ServerContext) -> mpsc::Sender<RebuildChanType> {
230 let (tx, mut rx) = futures::channel::mpsc::channel::<RebuildChanType>(100_000);
231 std::thread::spawn(move || {
232 let local_rt = tokio::runtime::Builder::new_current_thread()
233 .enable_all()
234 .build()
235 .expect("tokio runtime build failed");
236 let local_set = tokio::task::LocalSet::new();
237
238 local_set.block_on(&local_rt, async {
239 while let Some(msg) = rx.next().await {
240 match msg {
241 RebuildChanType::Session(session, session_expiry_interval) => {
242 match SessionState::offline_restart(session.clone(), session_expiry_interval).await {
243 Err(e) => {
244 log::warn!("Rebuild offline sessions error, {e:?}");
245 },
246 Ok(msg_tx) => {
247 let mut session_entry =
248 scx.extends.shared().await.entry(session.id.clone());
249
250 let id = session_entry.id().clone();
251 let task_fut = async move {
252 if let Err(e) = session_entry.set(session, msg_tx).await {
253 log::warn!("{:?} Rebuild offline sessions error, {:?}", session_entry.id(), e);
254 }
255 };
256 let task_exec = &scx.global_exec;
257 if let Err(e) = task_exec.spawn(task_fut).await {
258 log::warn!("{:?} Rebuild offline sessions error, {:?}", id, e.to_string());
259 }
260
261 let completed_count = task_exec.completed_count().await;
262 if completed_count > 0 && completed_count % 5000 == 0 {
263 log::info!(
264 "{:?} Rebuild offline sessions, completed_count: {}, active_count: {}, waiting_count: {}, rate: {:?}",
265 id,
266 task_exec.completed_count().await, task_exec.active_count(), task_exec.waiting_count(), task_exec.rate().await
267 );
268 }
269 }
270 }
271 },
272 RebuildChanType::Done(done_tx) => {
273 let task_exec = &scx.global_exec;
274 let _ = task_exec.flush().await;
275 let _ = done_tx.send(());
276 log::info!(
277 "Rebuild offline sessions, completed_count: {}, active_count: {}, waiting_count: {}, rate: {:?}",
278 task_exec.completed_count().await, task_exec.active_count(), task_exec.waiting_count(), task_exec.rate().await
279 );
280 }
281 }
282 }
283 });
284 log::info!("Offline session rebuilding finished");
285 });
286 tx
287 }
288}
289
290#[async_trait]
291impl Plugin for StoragePlugin {
292 #[inline]
293 async fn init(&mut self) -> Result<()> {
294 log::info!("{} init", self.name());
295 self.register
296 .add(
297 Type::BeforeStartup,
298 Box::new(StorageHandler::new(
299 self.scx.clone(),
300 self.storage_db.clone(),
301 self.cfg.clone(),
302 self.stored_session_infos.clone(),
303 self.rebuild_tx.clone(),
304 )),
305 )
306 .await;
307 self.register
308 .add(
309 Type::OfflineMessage,
310 Box::new(OfflineMessageHandler::new(self.cfg.clone(), self.storage_db.clone())),
311 )
312 .await;
313 self.register
314 .add(
315 Type::OfflineInflightMessages,
316 Box::new(OfflineMessageHandler::new(self.cfg.clone(), self.storage_db.clone())),
317 )
318 .await;
319
320 self.load_offline_session_infos().await?;
321
322 Ok(())
323 }
324
325 #[inline]
326 async fn get_config(&self) -> Result<serde_json::Value> {
327 Ok(self.cfg.to_json())
328 }
329
330 #[inline]
331 async fn start(&mut self) -> Result<()> {
332 log::info!("{} start", self.name());
333 *self.scx.extends.session_mgr_mut().await = Box::new(self.session_mgr.clone());
334
335 self.register.start().await;
336 Ok(())
337 }
338
339 #[inline]
340 async fn stop(&mut self) -> Result<bool> {
341 log::warn!("{} stop, if the storage plugin is started, it cannot be stopped", self.name());
342 Ok(false)
343 }
344
345 #[inline]
346 async fn attrs(&self) -> serde_json::Value {
347 async fn stats(storage_db: &DefaultStorageDB) -> (String, String, String, serde_json::Value) {
348 let max_limit = 1000;
349 let mut session_count = 0;
350 let mut storage_db_map = storage_db.clone();
351 {
352 let now = std::time::Instant::now();
353 let iter = storage_db_map.map_iter().await;
354 if let Ok(mut iter) = iter {
355 while let Some(m) = iter.next().await {
356 if let Ok(m) = m {
357 log::debug!("map: {:?}", StoredKey::from(m.name().to_vec()));
358 }
359 session_count += 1;
360 if session_count >= max_limit {
361 break;
362 }
363 }
364 }
365 log::debug!("map_iter cost time: {:?}", now.elapsed());
366 }
367
368 let mut offline_session_count = 0;
369 let mut offline_message_count = 0;
370 let mut storage_db_list = storage_db.clone();
371 {
372 let now = std::time::Instant::now();
373 let iter = storage_db_list.list_iter().await;
374 if let Ok(mut iter) = iter {
375 while let Some(l) = iter.next().await {
376 if let Ok(mut l) = l {
377 log::debug!("list: {:?}", StoredKey::from(l.name().to_vec()));
378 if let Ok(mut l_iter) = l.iter::<OfflineMessageOptionType>().await {
379 while let Some(msg) = l_iter.next().await {
380 if let Ok(Some(_)) = msg {
381 offline_message_count += 1;
382 if offline_message_count >= max_limit {
383 break;
384 }
385 }
386 }
387 }
388 }
389 offline_session_count += 1;
390 if offline_session_count >= max_limit {
391 break;
392 }
393 }
394 }
395 log::debug!("list_iter cost time: {:?}", now.elapsed());
396 }
397 let session_count = if session_count >= max_limit {
398 format!("{session_count}+")
399 } else {
400 format!("{session_count}")
401 };
402 let offline_session_count = if offline_session_count >= max_limit {
403 format!("{offline_session_count}+")
404 } else {
405 format!("{offline_session_count}")
406 };
407 let offline_message_count = if offline_message_count >= max_limit {
408 format!("{offline_message_count}+")
409 } else {
410 format!("{offline_message_count}")
411 };
412
413 let storage_info = storage_db.info().await.unwrap_or_default();
414
415 (session_count, offline_session_count, offline_message_count, storage_info)
416 }
417
418 let (session_count, offline_session_count, offline_message_count, storage_info) =
419 match tokio::time::timeout(Duration::from_secs(1), stats(&self.storage_db)).await {
420 Ok((session_count, offline_session_count, offline_message_count, storage_info)) => {
421 (session_count, offline_session_count, offline_message_count, storage_info)
422 }
423 Err(_) => ("Elapsed".into(), "Elapsed".into(), "Elapsed".into(), serde_json::Value::Null),
424 };
425
426 json!({
427 "session_count": session_count,
428 "offline_session_count": offline_session_count,
429 "offline_message_count": offline_message_count,
430 "storage_info": storage_info
431 })
432 }
433}
434
435struct OfflineMessageHandler {
436 cfg: Arc<PluginConfig>,
437 storage_db: DefaultStorageDB,
438}
439
440impl OfflineMessageHandler {
441 fn new(cfg: Arc<PluginConfig>, storage_db: DefaultStorageDB) -> Self {
442 Self { cfg, storage_db }
443 }
444}
445
446#[async_trait]
447impl Handler for OfflineMessageHandler {
448 async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
449 match param {
450 Parameter::OfflineMessage(s, f, p) => {
451 log::debug!(
452 "OfflineMessage storage_type: {:?}, from: {:?}, p: {:?}",
453 self.cfg.storage.typ,
454 f,
455 p
456 );
457 let list_stored_key = make_list_stored_key(s.id.to_string());
458 let storage_db = self.storage_db.clone();
459 let id = s.id.clone();
460 let max_mqueue_len = s.listen_cfg().max_mqueue_len;
461 let p = (*p).clone();
462 let f = f.clone();
463 tokio::spawn(async move {
464 match storage_db.list(list_stored_key.as_ref(), None).await {
465 Ok(offlines_list) => {
466 let res = offlines_list
467 .push_limit::<OfflineMessageOptionType>(
468 &Some((id.client_id.clone(), f, p)),
469 max_mqueue_len,
470 true,
471 )
472 .await;
473 if let Err(e) = res {
474 log::warn!("{id:?} save offline messages error, {e}")
475 }
476 }
477 Err(e) => {
478 log::warn!("{id:?} save offline messages error, {e}")
479 }
480 }
481 });
482 }
483
484 Parameter::OfflineInflightMessages(s, inflight_messages) => {
485 log::debug!(
486 "OfflineInflightMessages storage_type: {:?}, inflight_messages len: {:?}",
487 self.cfg.storage.typ,
488 inflight_messages.len(),
489 );
490 let map_stored_key = make_map_stored_key(s.id.to_string());
491 log::debug!("{:?} map_stored_key: {:?}", s.id, map_stored_key);
492
493 let storage_db = self.storage_db.clone();
494 let inflight_messages = inflight_messages.clone();
495 let id = s.id.clone();
496 tokio::spawn(async move {
497 match storage_db.map(map_stored_key.as_ref(), None).await {
498 Ok(m) => {
499 if let Err(e) = m.insert(INFLIGHT_MESSAGES, &inflight_messages).await {
500 log::warn!("{id:?} save offline inflight messages error, {e}")
501 }
502 }
503 Err(e) => {
504 log::warn!("{id:?} save offline inflight messages error, {e}")
505 }
506 }
507 });
508 }
509
510 _ => {
511 log::error!("unimplemented, {param:?}")
512 }
513 }
514 (true, acc)
515 }
516}
517
518struct StorageHandler {
519 scx: ServerContext,
520 storage_db: DefaultStorageDB,
521 cfg: Arc<PluginConfig>,
522 stored_session_infos: StoredSessionInfos,
523 rebuild_tx: mpsc::Sender<RebuildChanType>,
524}
525
526impl StorageHandler {
527 fn new(
528 scx: ServerContext,
529 storage_db: DefaultStorageDB,
530 cfg: Arc<PluginConfig>,
531 stored_session_infos: StoredSessionInfos,
532 rebuild_tx: mpsc::Sender<RebuildChanType>,
533 ) -> Self {
534 Self { scx, storage_db, cfg, stored_session_infos, rebuild_tx }
535 }
536
537 async fn rebuild_offline_sessions(&self, rebuild_done_tx: oneshot::Sender<()>) {
539 let mut offline_sessions_count = 0;
540 for mut entry in self.stored_session_infos.iter_mut() {
541 let (_, storeds) = entry.pair_mut();
542 if let Some(stored) = storeds.iter_mut().next() {
543 let id = stored.basic.id.clone();
544
545 let listen_cfg =
546 if let Some(listen_cfg) = self.scx.listen_cfgs.get(&id.lid).map(|c| c.value().clone()) {
547 listen_cfg
548 } else {
549 log::warn!("tcp listener config is not found, local addr is {:?}", id.local_addr);
550 continue;
551 };
552
553 log::info!("{id:?} listen_cfg: {listen_cfg:?}");
554
555 let fitter = self.scx.extends.fitter_mgr().await.create(
557 stored.basic.conn_info.clone(),
558 id.clone(),
559 listen_cfg.clone(),
560 );
561
562 let session_expiry_interval = session_expiry_interval(
564 fitter.as_ref(),
565 stored.disconnect_info.as_ref(),
566 stored.last_time,
567 )
568 .await;
569 log::debug!("{id:?} session_expiry_interval: {session_expiry_interval:?}");
570 if session_expiry_interval <= 0 {
571 log::debug!(
572 "{:?} session is expiry, {:?}, id_key: {:?}, {:?}, {:?}",
573 id,
574 session_expiry_interval,
575 stored.id_key,
576 make_map_stored_key(stored.id_key.as_ref()),
577 make_list_stored_key(stored.id_key.as_ref())
578 );
579 let storage_db = self.storage_db.clone();
580 if let Err(e) = storage_db.map_remove(make_map_stored_key(stored.id_key.as_ref())).await {
581 log::warn!("{id:?} remove map error, {e:?}");
582 }
583 if let Err(e) = storage_db.list_remove(make_list_stored_key(stored.id_key.as_ref())).await
584 {
585 log::warn!("{id:?} remove list error, {e:?}");
586 }
587 continue;
589 }
590 offline_sessions_count += 1;
591
592 if stored.disconnect_info.is_none() {
593 stored.disconnect_info = Some(DisconnectInfo::new(stored.last_time));
594 }
595
596 let max_inflight = fitter.max_inflight();
597 let max_mqueue_len = fitter.max_mqueue_len();
598 let subs = stored.subs.take().map(SessionSubs::from).unwrap_or_else(SessionSubs::new);
599
600 let session = match Session::new(
601 id.clone(),
602 self.scx.clone(),
603 max_mqueue_len,
604 listen_cfg,
605 fitter,
606 None,
607 max_inflight,
608 stored.basic.created_at,
609 stored.basic.conn_info.clone(),
610 false,
611 false,
612 false,
613 stored.basic.connected_at,
614 subs,
615 stored.disconnect_info.take(),
616 None,
617 )
618 .await
619 {
620 Ok(s) => s,
621 Err(e) => {
622 log::warn!("rebuild session offline message error, create session error, {e:?}");
623 continue;
624 }
625 };
626
627 let deliver_queue = session.deliver_queue();
628 for item in stored.offline_messages.drain(..) {
629 if let Err((f, p)) = deliver_queue.push(item) {
630 log::warn!("rebuild session offline message error, deliver queue is full, from: {f:?}, publish: {p:?}");
631 }
632 }
633
634 let out_inflight = session.out_inflight();
635 for item in stored.inflight_messages.drain(..) {
636 out_inflight.write().await.push_back(item);
637 }
638
639 if let Err(e) = self
640 .rebuild_tx
641 .clone()
642 .send(RebuildChanType::Session(
643 session,
644 Duration::from_millis(session_expiry_interval as u64),
645 ))
646 .await
647 {
648 log::error!("rebuild offline sessions error, {e:?}");
649 }
650 }
651 }
652 log::info!("offline_sessions_count: {offline_sessions_count}");
653 let _ = self.rebuild_tx.clone().send(RebuildChanType::Done(rebuild_done_tx)).await;
654 }
655}
656
657#[async_trait]
658impl Handler for StorageHandler {
659 async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
660 match param {
661 Parameter::BeforeStartup => {
662 log::info!(
663 "BeforeStartup storage_type: {:?}, stored_session_infos len: {}",
664 self.cfg.storage.typ,
665 self.stored_session_infos.len()
666 );
667 let (rebuild_done_tx, rebuild_done_rx) = oneshot::channel::<()>();
668 self.rebuild_offline_sessions(rebuild_done_tx).await;
669 let _ = rebuild_done_rx.await;
670 }
671 _ => {
672 log::error!("unimplemented, {param:?}")
673 }
674 }
675 (true, acc)
676 }
677}
678
679#[inline]
680async fn session_expiry_interval(
681 fitter: &dyn Fitter,
682 disconnect_info: Option<&DisconnectInfo>,
683 last_time: TimestampMillis,
684) -> TimestampMillis {
685 let disconnected_at = disconnect_info.map(|d| d.disconnected_at).unwrap_or_default();
686 let disconnected_at = if disconnected_at <= 0 { last_time } else { disconnected_at };
687 fitter.session_expiry_interval(disconnect_info.and_then(|d| d.mqtt_disconnect.as_ref())).as_millis()
688 as i64
689 - (timestamp_millis() - disconnected_at)
690}
691
692#[inline]
693pub(crate) fn make_map_stored_key<T: AsRef<[u8]>>(id: T) -> StoredKey {
694 let mut key = Vec::from("map-");
695 key.extend_from_slice(id.as_ref());
696 Bytes::from(key)
697}
698
699#[inline]
700pub(crate) fn map_stored_key_to_id_bytes(stored_key: &[u8]) -> &[u8] {
701 if stored_key.starts_with(b"map-") {
702 stored_key[4..].as_ref()
703 } else {
704 stored_key
705 }
706}
707
708#[inline]
709pub(crate) fn make_list_stored_key<T: AsRef<[u8]>>(id: T) -> StoredKey {
710 let mut key = Vec::from("list-");
711 key.extend_from_slice(id.as_ref());
712 Bytes::from(key)
713}
714
715#[inline]
716pub(crate) fn list_stored_key_to_id_bytes(stored_key: &[u8]) -> &[u8] {
717 if stored_key.starts_with(b"list-") {
718 stored_key[5..].as_ref()
719 } else {
720 stored_key
721 }
722}