rmqtt_session_storage/
lib.rs

1#![deny(unsafe_code)]
2
3use std::convert::From as _;
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::channel::{mpsc, oneshot};
10use futures::{SinkExt, StreamExt};
11use serde_json::{self, json};
12
13use rmqtt::{
14    fitter::Fitter,
15    hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
16    plugin::{PackageInfo, Plugin},
17    register,
18    session::Session,
19    types::DisconnectInfo,
20    types::{ClientId, From, Publish, SessionSubMap, SessionSubs, TimestampMillis},
21    utils::timestamp_millis,
22    Result,
23};
24
25use rmqtt_storage::{init_db, DefaultStorageDB, List, Map, StorageType};
26
27use config::PluginConfig;
28use rmqtt::context::ServerContext;
29use rmqtt::inflight::OutInflightMessage;
30use rmqtt::macros::Plugin;
31use rmqtt::session::SessionState;
32use session::{Basic, StorageSessionManager, StoredSessionInfo, StoredSessionInfos};
33use session::{StoredKey, BASIC, DISCONNECT_INFO, INFLIGHT_MESSAGES, LAST_TIME, SESSION_SUB_MAP};
34
35mod config;
36mod session;
37
38enum RebuildChanType {
39    Session(Session, Duration),
40    Done(oneshot::Sender<()>),
41}
42
43type OfflineMessageOptionType = Option<(ClientId, From, Publish)>;
44
45register!(StoragePlugin::new);
46
47#[derive(Plugin)]
48struct StoragePlugin {
49    scx: ServerContext,
50    cfg: Arc<PluginConfig>,
51    storage_db: DefaultStorageDB,
52    stored_session_infos: StoredSessionInfos,
53    register: Box<dyn Register>,
54    session_mgr: StorageSessionManager,
55    rebuild_tx: mpsc::Sender<RebuildChanType>,
56}
57
58impl StoragePlugin {
59    #[inline]
60    async fn new<S: Into<String>>(scx: ServerContext, name: S) -> Result<Self> {
61        let name = name.into();
62        let mut cfg = scx.plugins.read_config_default::<PluginConfig>(&name)?;
63        match cfg.storage.typ {
64            #[cfg(feature = "sled")]
65            StorageType::Sled => {
66                cfg.storage.sled.path =
67                    cfg.storage.sled.path.replace("{node}", &format!("{}", scx.node.id()));
68            }
69            #[cfg(feature = "redis")]
70            StorageType::Redis => {
71                cfg.storage.redis.prefix =
72                    cfg.storage.redis.prefix.replace("{node}", &format!("{}", scx.node.id()));
73            }
74            #[cfg(feature = "redis-cluster")]
75            StorageType::RedisCluster => {
76                cfg.storage.redis_cluster.prefix =
77                    cfg.storage.redis_cluster.prefix.replace("{node}", &format!("{}", scx.node.id()));
78            }
79        }
80
81        log::info!("{name} StoragePlugin cfg: {cfg:?}");
82
83        let storage_db = match init_db(&cfg.storage).await {
84            Err(e) => {
85                log::error!("{name} init storage db error, {e}");
86                return Err(e);
87            }
88            Ok(db) => db,
89        };
90
91        let stored_session_infos = StoredSessionInfos::new();
92
93        let register = scx.extends.hook_mgr().register();
94        let session_mgr = StorageSessionManager::new(storage_db.clone(), stored_session_infos.clone());
95
96        let cfg = Arc::new(cfg);
97        let rebuild_tx = Self::start_local_runtime(scx.clone());
98        Ok(Self { scx, cfg, storage_db, stored_session_infos, register, session_mgr, rebuild_tx })
99    }
100
101    async fn load_offline_session_infos(&mut self) -> Result<()> {
102        log::info!("{:?} load_offline_session_infos ...", self.name());
103        let storage_db = self.storage_db.clone();
104        let mut iter_storage_db = storage_db.clone();
105        //Load offline session information from the database
106        let mut map_iter = iter_storage_db.map_iter().await?;
107        while let Some(m) = map_iter.next().await {
108            match m {
109                Ok(m) => {
110                    let id_key = StoredKey::from(map_stored_key_to_id_bytes(m.name()).to_vec());
111                    log::debug!("map_stored_key: {id_key:?}");
112                    let basic = match m.get::<_, Basic>(BASIC).await {
113                        Err(e) => {
114                            log::warn!("{id_key:?} load offline session basic info error, {e:?}");
115                            if let Err(e) = storage_db.map_remove(m.name()).await {
116                                log::warn!("{id_key:?} remove offline session info error, {e:?}");
117                            }
118                            continue;
119                        }
120                        Ok(None) => {
121                            log::warn!("{id_key:?} offline session basic info is None");
122                            if let Err(e) = storage_db.map_remove(m.name()).await {
123                                log::warn!("{id_key:?} remove offline session info error, {e:?}");
124                            }
125                            continue;
126                        }
127                        Ok(Some(basic)) => basic,
128                    };
129
130                    log::debug!("basic: {basic:?}");
131                    log::debug!("map key: {id_key:?}");
132                    let mut s_info = StoredSessionInfo::from(id_key.clone(), basic);
133
134                    match m.get::<_, TimestampMillis>(LAST_TIME).await {
135                        Ok(Some(last_time)) => {
136                            log::debug!("last_time: {last_time:?}");
137                            s_info.set_last_time(last_time);
138                        }
139                        Ok(None) => {}
140                        Err(e) => {
141                            log::warn!("{id_key:?} load offline session last time error, {e:?}");
142                        }
143                    }
144
145                    match m.get::<_, SessionSubMap>(SESSION_SUB_MAP).await {
146                        Ok(Some(subs)) => {
147                            log::debug!("subs: {subs:?}");
148                            s_info.set_subs(subs);
149                        }
150                        Ok(None) => {}
151                        Err(e) => {
152                            log::warn!("{id_key:?} load offline session subscription info error, {e:?}");
153                        }
154                    }
155
156                    match m.get::<_, DisconnectInfo>(DISCONNECT_INFO).await {
157                        Ok(Some(disc_info)) => {
158                            log::debug!("disc_info: {disc_info:?}");
159                            s_info.set_disconnect_info(disc_info);
160                        }
161                        Ok(None) => {}
162                        Err(e) => {
163                            log::warn!("{id_key:?} load offline session disconnect info error, {e:?}");
164                        }
165                    }
166
167                    match m.get::<_, Vec<OutInflightMessage>>(INFLIGHT_MESSAGES).await {
168                        Ok(Some(inflights)) => {
169                            log::debug!("inflights len: {:?}", inflights.len());
170                            s_info.inflight_messages = inflights;
171                        }
172                        Ok(None) => {}
173                        Err(e) => {
174                            log::warn!("{id_key:?} load offline session inflight messages error, {e:?}");
175                        }
176                    }
177
178                    self.stored_session_infos.add(s_info);
179                }
180                Err(e) => {
181                    log::warn!("load offline session info error, {e:?}");
182                }
183            }
184        }
185        drop(map_iter);
186
187        let mut list_iter = iter_storage_db.list_iter().await?;
188        while let Some(l) = list_iter.next().await {
189            match l {
190                Ok(l) => {
191                    let id_key = StoredKey::from(list_stored_key_to_id_bytes(l.name()).to_vec());
192                    log::debug!("list_stored_key, id_key: {id_key:?}");
193                    match l.all::<OfflineMessageOptionType>().await {
194                        Ok(offline_msgs) => {
195                            log::debug!("{:?} offline_msgs len: {}", id_key, offline_msgs.len(),);
196                            let ok =
197                                self.stored_session_infos.set_offline_messages(id_key.clone(), offline_msgs);
198                            log::debug!("{id_key:?} stored_session_infos, set_offline_messages res: {ok}");
199                            if !ok {
200                                if let Err(e) = storage_db.list_remove(l.name()).await {
201                                    log::warn!("{id_key:?} remove offline messages error, {e:?}");
202                                }
203                            }
204                        }
205                        Err(e) => {
206                            log::warn!("{id_key:?} load offline messages error, {e:?}");
207                            if let Err(e) = storage_db.list_remove(l.name()).await {
208                                log::warn!("{id_key:?} remove offline messages error, {e:?}");
209                            }
210                        }
211                    }
212                }
213                Err(e) => {
214                    log::warn!("load offline messages error, {e:?}");
215                }
216            }
217        }
218        drop(list_iter);
219
220        for removed_key in self.stored_session_infos.retain_latests() {
221            storage_db.map_remove(make_map_stored_key(removed_key.as_ref())).await?;
222            storage_db.list_remove(make_list_stored_key(removed_key.as_ref())).await?;
223        }
224        log::info!("stored_session_infos len: {:?}", self.stored_session_infos.len());
225
226        Ok(())
227    }
228
229    fn start_local_runtime(scx: ServerContext) -> mpsc::Sender<RebuildChanType> {
230        let (tx, mut rx) = futures::channel::mpsc::channel::<RebuildChanType>(100_000);
231        std::thread::spawn(move || {
232            let local_rt = tokio::runtime::Builder::new_current_thread()
233                .enable_all()
234                .build()
235                .expect("tokio runtime build failed");
236            let local_set = tokio::task::LocalSet::new();
237
238            local_set.block_on(&local_rt, async {
239                while let Some(msg) = rx.next().await {
240                    match msg {
241                        RebuildChanType::Session(session, session_expiry_interval)  => {
242                            match SessionState::offline_restart(session.clone(), session_expiry_interval).await {
243                                Err(e) => {
244                                    log::warn!("Rebuild offline sessions error, {e:?}");
245                                },
246                                Ok(msg_tx) => {
247                                    let mut session_entry =
248                                        scx.extends.shared().await.entry(session.id.clone());
249
250                                    let id = session_entry.id().clone();
251                                    let task_fut = async move {
252                                        if let Err(e) = session_entry.set(session, msg_tx).await {
253                                            log::warn!("{:?} Rebuild offline sessions error, {:?}", session_entry.id(), e);
254                                        }
255                                    };
256                                    let task_exec = &scx.global_exec;
257                                    if let Err(e) = task_exec.spawn(task_fut).await {
258                                        log::warn!("{:?} Rebuild offline sessions error, {:?}", id, e.to_string());
259                                    }
260
261                                    let completed_count = task_exec.completed_count().await;
262                                    if completed_count > 0 && completed_count % 5000 == 0 {
263                                        log::info!(
264                                        "{:?} Rebuild offline sessions, completed_count: {}, active_count: {}, waiting_count: {}, rate: {:?}",
265                                        id,
266                                        task_exec.completed_count().await, task_exec.active_count(), task_exec.waiting_count(), task_exec.rate().await
267                                    );
268                                    }
269                                }
270                            }
271                        },
272                        RebuildChanType::Done(done_tx) => {
273                            let task_exec = &scx.global_exec;
274                            let _ = task_exec.flush().await;
275                            let _ = done_tx.send(());
276                            log::info!(
277                                "Rebuild offline sessions, completed_count: {}, active_count: {}, waiting_count: {}, rate: {:?}",
278                                task_exec.completed_count().await, task_exec.active_count(), task_exec.waiting_count(), task_exec.rate().await
279                            );
280                        }
281                    }
282                }
283            });
284            log::info!("Offline session rebuilding finished");
285        });
286        tx
287    }
288}
289
290#[async_trait]
291impl Plugin for StoragePlugin {
292    #[inline]
293    async fn init(&mut self) -> Result<()> {
294        log::info!("{} init", self.name());
295        self.register
296            .add(
297                Type::BeforeStartup,
298                Box::new(StorageHandler::new(
299                    self.scx.clone(),
300                    self.storage_db.clone(),
301                    self.cfg.clone(),
302                    self.stored_session_infos.clone(),
303                    self.rebuild_tx.clone(),
304                )),
305            )
306            .await;
307        self.register
308            .add(
309                Type::OfflineMessage,
310                Box::new(OfflineMessageHandler::new(self.cfg.clone(), self.storage_db.clone())),
311            )
312            .await;
313        self.register
314            .add(
315                Type::OfflineInflightMessages,
316                Box::new(OfflineMessageHandler::new(self.cfg.clone(), self.storage_db.clone())),
317            )
318            .await;
319
320        self.load_offline_session_infos().await?;
321
322        Ok(())
323    }
324
325    #[inline]
326    async fn get_config(&self) -> Result<serde_json::Value> {
327        Ok(self.cfg.to_json())
328    }
329
330    #[inline]
331    async fn start(&mut self) -> Result<()> {
332        log::info!("{} start", self.name());
333        *self.scx.extends.session_mgr_mut().await = Box::new(self.session_mgr.clone());
334
335        self.register.start().await;
336        Ok(())
337    }
338
339    #[inline]
340    async fn stop(&mut self) -> Result<bool> {
341        log::warn!("{} stop, if the storage plugin is started, it cannot be stopped", self.name());
342        Ok(false)
343    }
344
345    #[inline]
346    async fn attrs(&self) -> serde_json::Value {
347        async fn stats(storage_db: &DefaultStorageDB) -> (String, String, String, serde_json::Value) {
348            let max_limit = 1000;
349            let mut session_count = 0;
350            let mut storage_db_map = storage_db.clone();
351            {
352                let now = std::time::Instant::now();
353                let iter = storage_db_map.map_iter().await;
354                if let Ok(mut iter) = iter {
355                    while let Some(m) = iter.next().await {
356                        if let Ok(m) = m {
357                            log::debug!("map: {:?}", StoredKey::from(m.name().to_vec()));
358                        }
359                        session_count += 1;
360                        if session_count >= max_limit {
361                            break;
362                        }
363                    }
364                }
365                log::debug!("map_iter cost time: {:?}", now.elapsed());
366            }
367
368            let mut offline_session_count = 0;
369            let mut offline_message_count = 0;
370            let mut storage_db_list = storage_db.clone();
371            {
372                let now = std::time::Instant::now();
373                let iter = storage_db_list.list_iter().await;
374                if let Ok(mut iter) = iter {
375                    while let Some(l) = iter.next().await {
376                        if let Ok(mut l) = l {
377                            log::debug!("list: {:?}", StoredKey::from(l.name().to_vec()));
378                            if let Ok(mut l_iter) = l.iter::<OfflineMessageOptionType>().await {
379                                while let Some(msg) = l_iter.next().await {
380                                    if let Ok(Some(_)) = msg {
381                                        offline_message_count += 1;
382                                        if offline_message_count >= max_limit {
383                                            break;
384                                        }
385                                    }
386                                }
387                            }
388                        }
389                        offline_session_count += 1;
390                        if offline_session_count >= max_limit {
391                            break;
392                        }
393                    }
394                }
395                log::debug!("list_iter cost time: {:?}", now.elapsed());
396            }
397            let session_count = if session_count >= max_limit {
398                format!("{session_count}+")
399            } else {
400                format!("{session_count}")
401            };
402            let offline_session_count = if offline_session_count >= max_limit {
403                format!("{offline_session_count}+")
404            } else {
405                format!("{offline_session_count}")
406            };
407            let offline_message_count = if offline_message_count >= max_limit {
408                format!("{offline_message_count}+")
409            } else {
410                format!("{offline_message_count}")
411            };
412
413            let storage_info = storage_db.info().await.unwrap_or_default();
414
415            (session_count, offline_session_count, offline_message_count, storage_info)
416        }
417
418        let (session_count, offline_session_count, offline_message_count, storage_info) =
419            match tokio::time::timeout(Duration::from_secs(1), stats(&self.storage_db)).await {
420                Ok((session_count, offline_session_count, offline_message_count, storage_info)) => {
421                    (session_count, offline_session_count, offline_message_count, storage_info)
422                }
423                Err(_) => ("Elapsed".into(), "Elapsed".into(), "Elapsed".into(), serde_json::Value::Null),
424            };
425
426        json!({
427            "session_count": session_count,
428            "offline_session_count": offline_session_count,
429            "offline_message_count": offline_message_count,
430            "storage_info": storage_info
431        })
432    }
433}
434
435struct OfflineMessageHandler {
436    cfg: Arc<PluginConfig>,
437    storage_db: DefaultStorageDB,
438}
439
440impl OfflineMessageHandler {
441    fn new(cfg: Arc<PluginConfig>, storage_db: DefaultStorageDB) -> Self {
442        Self { cfg, storage_db }
443    }
444}
445
446#[async_trait]
447impl Handler for OfflineMessageHandler {
448    async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
449        match param {
450            Parameter::OfflineMessage(s, f, p) => {
451                log::debug!(
452                    "OfflineMessage storage_type: {:?}, from: {:?}, p: {:?}",
453                    self.cfg.storage.typ,
454                    f,
455                    p
456                );
457                let list_stored_key = make_list_stored_key(s.id.to_string());
458                let storage_db = self.storage_db.clone();
459                let id = s.id.clone();
460                let max_mqueue_len = s.listen_cfg().max_mqueue_len;
461                let p = (*p).clone();
462                let f = f.clone();
463                tokio::spawn(async move {
464                    match storage_db.list(list_stored_key.as_ref(), None).await {
465                        Ok(offlines_list) => {
466                            let res = offlines_list
467                                .push_limit::<OfflineMessageOptionType>(
468                                    &Some((id.client_id.clone(), f, p)),
469                                    max_mqueue_len,
470                                    true,
471                                )
472                                .await;
473                            if let Err(e) = res {
474                                log::warn!("{id:?} save offline messages error, {e}")
475                            }
476                        }
477                        Err(e) => {
478                            log::warn!("{id:?} save offline messages error, {e}")
479                        }
480                    }
481                });
482            }
483
484            Parameter::OfflineInflightMessages(s, inflight_messages) => {
485                log::debug!(
486                    "OfflineInflightMessages storage_type: {:?}, inflight_messages len: {:?}",
487                    self.cfg.storage.typ,
488                    inflight_messages.len(),
489                );
490                let map_stored_key = make_map_stored_key(s.id.to_string());
491                log::debug!("{:?} map_stored_key: {:?}", s.id, map_stored_key);
492
493                let storage_db = self.storage_db.clone();
494                let inflight_messages = inflight_messages.clone();
495                let id = s.id.clone();
496                tokio::spawn(async move {
497                    match storage_db.map(map_stored_key.as_ref(), None).await {
498                        Ok(m) => {
499                            if let Err(e) = m.insert(INFLIGHT_MESSAGES, &inflight_messages).await {
500                                log::warn!("{id:?} save offline inflight messages error, {e}")
501                            }
502                        }
503                        Err(e) => {
504                            log::warn!("{id:?} save offline inflight messages error, {e}")
505                        }
506                    }
507                });
508            }
509
510            _ => {
511                log::error!("unimplemented, {param:?}")
512            }
513        }
514        (true, acc)
515    }
516}
517
518struct StorageHandler {
519    scx: ServerContext,
520    storage_db: DefaultStorageDB,
521    cfg: Arc<PluginConfig>,
522    stored_session_infos: StoredSessionInfos,
523    rebuild_tx: mpsc::Sender<RebuildChanType>,
524}
525
526impl StorageHandler {
527    fn new(
528        scx: ServerContext,
529        storage_db: DefaultStorageDB,
530        cfg: Arc<PluginConfig>,
531        stored_session_infos: StoredSessionInfos,
532        rebuild_tx: mpsc::Sender<RebuildChanType>,
533    ) -> Self {
534        Self { scx, storage_db, cfg, stored_session_infos, rebuild_tx }
535    }
536
537    //Rebuild offline sessions.
538    async fn rebuild_offline_sessions(&self, rebuild_done_tx: oneshot::Sender<()>) {
539        let mut offline_sessions_count = 0;
540        for mut entry in self.stored_session_infos.iter_mut() {
541            let (_, storeds) = entry.pair_mut();
542            if let Some(stored) = storeds.iter_mut().next() {
543                let id = stored.basic.id.clone();
544
545                let listen_cfg =
546                    if let Some(listen_cfg) = self.scx.listen_cfgs.get(&id.lid).map(|c| c.value().clone()) {
547                        listen_cfg
548                    } else {
549                        log::warn!("tcp listener config is not found, local addr is {:?}", id.local_addr);
550                        continue;
551                    };
552
553                log::info!("{id:?} listen_cfg: {listen_cfg:?}");
554
555                //create fitter
556                let fitter = self.scx.extends.fitter_mgr().await.create(
557                    stored.basic.conn_info.clone(),
558                    id.clone(),
559                    listen_cfg.clone(),
560                );
561
562                //check session expiry interval
563                let session_expiry_interval = session_expiry_interval(
564                    fitter.as_ref(),
565                    stored.disconnect_info.as_ref(),
566                    stored.last_time,
567                )
568                .await;
569                log::debug!("{id:?} session_expiry_interval: {session_expiry_interval:?}");
570                if session_expiry_interval <= 0 {
571                    log::debug!(
572                        "{:?} session is expiry, {:?}, id_key: {:?}, {:?}, {:?}",
573                        id,
574                        session_expiry_interval,
575                        stored.id_key,
576                        make_map_stored_key(stored.id_key.as_ref()),
577                        make_list_stored_key(stored.id_key.as_ref())
578                    );
579                    let storage_db = self.storage_db.clone();
580                    if let Err(e) = storage_db.map_remove(make_map_stored_key(stored.id_key.as_ref())).await {
581                        log::warn!("{id:?} remove map error, {e:?}");
582                    }
583                    if let Err(e) = storage_db.list_remove(make_list_stored_key(stored.id_key.as_ref())).await
584                    {
585                        log::warn!("{id:?} remove list error, {e:?}");
586                    }
587                    //session is expiry
588                    continue;
589                }
590                offline_sessions_count += 1;
591
592                if stored.disconnect_info.is_none() {
593                    stored.disconnect_info = Some(DisconnectInfo::new(stored.last_time));
594                }
595
596                let max_inflight = fitter.max_inflight();
597                let max_mqueue_len = fitter.max_mqueue_len();
598                let subs = stored.subs.take().map(SessionSubs::from).unwrap_or_else(SessionSubs::new);
599
600                let session = match Session::new(
601                    id.clone(),
602                    self.scx.clone(),
603                    max_mqueue_len,
604                    listen_cfg,
605                    fitter,
606                    None,
607                    max_inflight,
608                    stored.basic.created_at,
609                    stored.basic.conn_info.clone(),
610                    false,
611                    false,
612                    false,
613                    stored.basic.connected_at,
614                    subs,
615                    stored.disconnect_info.take(),
616                    None,
617                )
618                .await
619                {
620                    Ok(s) => s,
621                    Err(e) => {
622                        log::warn!("rebuild session offline message error, create session error, {e:?}");
623                        continue;
624                    }
625                };
626
627                let deliver_queue = session.deliver_queue();
628                for item in stored.offline_messages.drain(..) {
629                    if let Err((f, p)) = deliver_queue.push(item) {
630                        log::warn!("rebuild session offline message error, deliver queue is full, from: {f:?}, publish: {p:?}");
631                    }
632                }
633
634                let out_inflight = session.out_inflight();
635                for item in stored.inflight_messages.drain(..) {
636                    out_inflight.write().await.push_back(item);
637                }
638
639                if let Err(e) = self
640                    .rebuild_tx
641                    .clone()
642                    .send(RebuildChanType::Session(
643                        session,
644                        Duration::from_millis(session_expiry_interval as u64),
645                    ))
646                    .await
647                {
648                    log::error!("rebuild offline sessions error, {e:?}");
649                }
650            }
651        }
652        log::info!("offline_sessions_count: {offline_sessions_count}");
653        let _ = self.rebuild_tx.clone().send(RebuildChanType::Done(rebuild_done_tx)).await;
654    }
655}
656
657#[async_trait]
658impl Handler for StorageHandler {
659    async fn hook(&self, param: &Parameter, acc: Option<HookResult>) -> ReturnType {
660        match param {
661            Parameter::BeforeStartup => {
662                log::info!(
663                    "BeforeStartup storage_type: {:?}, stored_session_infos len: {}",
664                    self.cfg.storage.typ,
665                    self.stored_session_infos.len()
666                );
667                let (rebuild_done_tx, rebuild_done_rx) = oneshot::channel::<()>();
668                self.rebuild_offline_sessions(rebuild_done_tx).await;
669                let _ = rebuild_done_rx.await;
670            }
671            _ => {
672                log::error!("unimplemented, {param:?}")
673            }
674        }
675        (true, acc)
676    }
677}
678
679#[inline]
680async fn session_expiry_interval(
681    fitter: &dyn Fitter,
682    disconnect_info: Option<&DisconnectInfo>,
683    last_time: TimestampMillis,
684) -> TimestampMillis {
685    let disconnected_at = disconnect_info.map(|d| d.disconnected_at).unwrap_or_default();
686    let disconnected_at = if disconnected_at <= 0 { last_time } else { disconnected_at };
687    fitter.session_expiry_interval(disconnect_info.and_then(|d| d.mqtt_disconnect.as_ref())).as_millis()
688        as i64
689        - (timestamp_millis() - disconnected_at)
690}
691
692#[inline]
693pub(crate) fn make_map_stored_key<T: AsRef<[u8]>>(id: T) -> StoredKey {
694    let mut key = Vec::from("map-");
695    key.extend_from_slice(id.as_ref());
696    Bytes::from(key)
697}
698
699#[inline]
700pub(crate) fn map_stored_key_to_id_bytes(stored_key: &[u8]) -> &[u8] {
701    if stored_key.starts_with(b"map-") {
702        stored_key[4..].as_ref()
703    } else {
704        stored_key
705    }
706}
707
708#[inline]
709pub(crate) fn make_list_stored_key<T: AsRef<[u8]>>(id: T) -> StoredKey {
710    let mut key = Vec::from("list-");
711    key.extend_from_slice(id.as_ref());
712    Bytes::from(key)
713}
714
715#[inline]
716pub(crate) fn list_stored_key_to_id_bytes(stored_key: &[u8]) -> &[u8] {
717    if stored_key.starts_with(b"list-") {
718        stored_key[5..].as_ref()
719    } else {
720        stored_key
721    }
722}