Skip to main content

loro_internal/
awareness.rs

1//! Ephemeral presence utilities.
2//!
3//! `EphemeralStore` is the recommended API: a timestamped, last-write-wins key-value
4//! store for transient presence data (cursors, selections, etc.). It supports:
5//! - Per-key timeouts: expired entries are skipped by `encode`/`encode_all` and removed
6//!   by `remove_outdated`.
7//! - Import/export via `encode`/`apply` for syncing between peers.
8//! - Subscriptions for both local updates (raw bytes to send) and merged updates.
9//!
10//! The legacy `Awareness` type remains for backward compatibility but is deprecated in
11//! favor of `EphemeralStore`.
12use crate::sync::Mutex;
13use std::sync::atomic::AtomicI64;
14use std::sync::Arc;
15
16use loro_common::{LoroValue, PeerID};
17use rustc_hash::FxHashMap;
18use serde::de::{DeserializeSeed, EnumAccess, MapAccess, SeqAccess, VariantAccess};
19use serde::{Deserialize, Deserializer, Serialize};
20
21use crate::change::{get_sys_timestamp, Timestamp};
22use crate::{SubscriberSetWithQueue, Subscription};
23
24/// `Awareness` is a structure that tracks the ephemeral state of peers.
25///
26/// It can be used to synchronize cursor positions, selections, and the names of the peers.
27///
28/// The state of a specific peer is expected to be removed after a specified timeout. Use
29/// `remove_outdated` to eliminate outdated states.
30#[derive(Debug, Clone)]
31#[deprecated(since = "1.4.6", note = "Use `EphemeralStore` instead.")]
32pub struct Awareness {
33    peer: PeerID,
34    peers: FxHashMap<PeerID, PeerInfo>,
35    timeout: i64,
36}
37
38#[derive(Debug, Clone)]
39pub struct PeerInfo {
40    pub state: LoroValue,
41    pub counter: i32,
42    // This field is generated locally
43    pub timestamp: i64,
44}
45
46#[derive(Serialize, Deserialize)]
47struct EncodedPeerInfo {
48    peer: PeerID,
49    counter: i32,
50    record: LoroValue,
51}
52
53#[derive(Deserialize)]
54struct DecodedPeerInfo {
55    peer: PeerID,
56    counter: i32,
57    #[serde(deserialize_with = "deserialize_depth_limited_loro_value")]
58    record: LoroValue,
59}
60
61#[allow(deprecated)]
62impl Awareness {
63    pub fn new(peer: PeerID, timeout: i64) -> Awareness {
64        Awareness {
65            peer,
66            timeout,
67            peers: FxHashMap::default(),
68        }
69    }
70
71    pub fn encode(&self, peers: &[PeerID]) -> Vec<u8> {
72        let mut peers_info = Vec::new();
73        let now = get_sys_timestamp() as Timestamp;
74        for peer in peers {
75            if let Some(peer_info) = self.peers.get(peer) {
76                if now - peer_info.timestamp > self.timeout {
77                    continue;
78                }
79
80                let encoded_peer_info = EncodedPeerInfo {
81                    peer: *peer,
82                    record: peer_info.state.clone(),
83                    counter: peer_info.counter,
84                };
85                peers_info.push(encoded_peer_info);
86            }
87        }
88
89        postcard::to_allocvec(&peers_info).unwrap()
90    }
91
92    pub fn encode_all(&self) -> Vec<u8> {
93        let mut peers_info = Vec::new();
94        let now = get_sys_timestamp() as Timestamp;
95        for (peer, peer_info) in self.peers.iter() {
96            if now - peer_info.timestamp > self.timeout {
97                continue;
98            }
99
100            let encoded_peer_info = EncodedPeerInfo {
101                peer: *peer,
102                record: peer_info.state.clone(),
103                counter: peer_info.counter,
104            };
105            peers_info.push(encoded_peer_info);
106        }
107
108        postcard::to_allocvec(&peers_info).unwrap()
109    }
110
111    /// Returns (updated, added)
112    pub fn apply(&mut self, encoded_peers_info: &[u8]) -> (Vec<PeerID>, Vec<PeerID>) {
113        let peers_info: Vec<DecodedPeerInfo> =
114            postcard::from_bytes(encoded_peers_info).expect("Failed to decode awareness data");
115        let mut changed_peers = Vec::new();
116        let mut added_peers = Vec::new();
117        let now = get_sys_timestamp() as Timestamp;
118        for peer_info in peers_info {
119            match self.peers.get(&peer_info.peer) {
120                Some(x) if x.counter >= peer_info.counter || peer_info.peer == self.peer => {
121                    // do nothing
122                }
123                _ => {
124                    let old = self.peers.insert(
125                        peer_info.peer,
126                        PeerInfo {
127                            counter: peer_info.counter,
128                            state: peer_info.record,
129                            timestamp: now,
130                        },
131                    );
132                    if old.is_some() {
133                        changed_peers.push(peer_info.peer);
134                    } else {
135                        added_peers.push(peer_info.peer);
136                    }
137                }
138            }
139        }
140
141        (changed_peers, added_peers)
142    }
143
144    pub fn set_local_state(&mut self, value: impl Into<LoroValue>) {
145        self._set_local_state(value.into());
146    }
147
148    fn _set_local_state(&mut self, value: LoroValue) {
149        let peer = self.peers.entry(self.peer).or_insert_with(|| PeerInfo {
150            state: Default::default(),
151            counter: 0,
152            timestamp: 0,
153        });
154
155        peer.state = value;
156        peer.counter += 1;
157        peer.timestamp = get_sys_timestamp() as Timestamp;
158    }
159
160    pub fn get_local_state(&self) -> Option<LoroValue> {
161        self.peers.get(&self.peer).map(|x| x.state.clone())
162    }
163
164    pub fn remove_outdated(&mut self) -> Vec<PeerID> {
165        let now = get_sys_timestamp() as Timestamp;
166        let mut removed = Vec::new();
167        self.peers.retain(|id, v| {
168            if now - v.timestamp > self.timeout {
169                removed.push(*id);
170                false
171            } else {
172                true
173            }
174        });
175
176        removed
177    }
178
179    pub fn get_all_states(&self) -> &FxHashMap<PeerID, PeerInfo> {
180        &self.peers
181    }
182
183    pub fn peer(&self) -> PeerID {
184        self.peer
185    }
186}
187
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum EphemeralEventTrigger {
190    Local,
191    Import,
192    Timeout,
193}
194
195#[derive(Debug, Clone)]
196pub struct EphemeralStoreEvent {
197    pub by: EphemeralEventTrigger,
198    pub added: Arc<Vec<String>>,
199    pub updated: Arc<Vec<String>>,
200    pub removed: Arc<Vec<String>>,
201}
202
203pub type LocalEphemeralCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
204pub type EphemeralSubscriber = Box<dyn Fn(&EphemeralStoreEvent) -> bool + Send + Sync + 'static>;
205
206/// `EphemeralStore` is a structure that tracks ephemeral key-value state across peers.
207///
208/// - Use it for syncing lightweight presence/state like cursors, selections, and UI hints.
209/// - Each key uses timestamp-based LWW (Last-Write-Wins) conflict resolution.
210/// - Timeout unit: milliseconds.
211/// - After timeout: entries are considered expired. They are omitted from
212///   `encode`/`encode_all`, and calling [`remove_outdated`] will purge them and
213///   notify subscribers with `EphemeralEventTrigger::Timeout`.
214///
215/// In Rust, you are responsible for periodically calling [`remove_outdated`]
216/// if you want timed-out entries to be removed and corresponding events to be
217/// emitted. In the WASM/TS wrapper, a timer runs automatically while the store
218/// is non-empty.
219///
220/// See: https://loro.dev/docs/tutorial/ephemeral
221///
222/// # Example
223///
224/// ```rust
225/// use loro_internal::awareness::EphemeralStore;
226///
227/// let mut store = EphemeralStore::new(1000);
228/// store.set("key", "value");
229/// let encoded = store.encode("key");
230/// let mut store2 = EphemeralStore::new(1000);
231/// store.subscribe_local_updates(Box::new(|data| {
232///     println!("local update: {:?}", data);
233///     true
234/// }));
235/// store2.apply(&encoded);
236/// assert_eq!(store2.get("key"), Some("value".into()));
237/// ```
238#[derive(Debug, Clone)]
239pub struct EphemeralStore {
240    inner: Arc<EphemeralStoreInner>,
241}
242
243impl EphemeralStore {
244    /// Create a new `EphemeralStore`.
245    ///
246    /// - `timeout`: inactivity timeout in milliseconds. If a key does not
247    ///   receive updates within this duration, it is considered expired. It
248    ///   will be skipped by `encode`/`encode_all`, and removed when
249    ///   [`remove_outdated`] is called (triggering a `Timeout` event to
250    ///   subscribers).
251    pub fn new(timeout: i64) -> Self {
252        Self {
253            inner: Arc::new(EphemeralStoreInner::new(timeout)),
254        }
255    }
256
257    /// Encode the latest value of `key`.
258    ///
259    /// Expired keys (past timeout) are omitted and produce an empty payload.
260    pub fn encode(&self, key: &str) -> Vec<u8> {
261        self.inner.encode(key)
262    }
263
264    /// Encode all non-expired keys.
265    ///
266    /// Entries that have exceeded the timeout are not included in the payload.
267    pub fn encode_all(&self) -> Vec<u8> {
268        self.inner.encode_all()
269    }
270
271    /// Apply encoded updates imported from another peer/process.
272    ///
273    /// Subscribers receive an event with `by = Import` and the lists of
274    /// `added`/`updated`/`removed` keys as appropriate.
275    pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
276        self.inner.apply(data)
277    }
278
279    pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
280        self.inner.set(key, value)
281    }
282
283    pub fn delete(&self, key: &str) {
284        self.inner.delete(key)
285    }
286
287    pub fn get(&self, key: &str) -> Option<LoroValue> {
288        self.inner.get(key)
289    }
290
291    /// Remove entries whose last update timestamp has exceeded the timeout.
292    ///
293    /// If any keys are removed, subscribers receive an event with
294    /// `by = Timeout` and the `removed` keys. This does not run automatically
295    /// in Rust; call it on your own schedule.
296    pub fn remove_outdated(&self) {
297        self.inner.remove_outdated()
298    }
299
300    pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
301        self.inner.get_all_states()
302    }
303
304    pub fn keys(&self) -> Vec<String> {
305        self.inner.keys()
306    }
307
308    /// Subscribe to local ephemeral/awareness updates.
309    ///
310    /// The callback receives encoded update bytes whenever local ephemeral state changes.
311    /// This is useful for syncing awareness information like cursor positions, selections,
312    /// or presence data to other peers in real-time.
313    ///
314    /// **Auto-unsubscription**: If the callback returns `false`, the subscription will be
315    /// automatically removed, providing a convenient way to implement one-time or conditional
316    /// subscriptions in Rust.
317    ///
318    /// # Parameters
319    /// - `callback`: Function that receives `&Vec<u8>` (encoded ephemeral updates) and returns `bool`
320    ///   - Return `true` to keep the subscription active
321    ///   - Return `false` to automatically unsubscribe
322    ///
323    /// # Example
324    /// ```rust
325    /// use loro_internal::awareness::EphemeralStore;
326    /// use std::sync::{Arc, Mutex};
327    ///
328    /// let store = EphemeralStore::new(30000);
329    /// let update_count = Arc::new(Mutex::new(0));
330    /// let count_clone = update_count.clone();
331    ///
332    /// // Subscribe and collect first 3 updates, then auto-unsubscribe
333    /// let sub = store.subscribe_local_updates(Box::new(move |bytes| {
334    ///     println!("Received {} bytes of ephemeral data", bytes.len());
335    ///     let mut count = count_clone.lock().unwrap();
336    ///     *count += 1;
337    ///     *count < 3  // Auto-unsubscribe after 3 updates
338    /// }));
339    ///
340    /// store.set("cursor", 42);
341    /// ```
342    pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
343        self.inner.subscribe_local_updates(callback)
344    }
345
346    pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
347        self.inner.subscribe(callback)
348    }
349}
350
351struct EphemeralStoreInner {
352    states: Mutex<FxHashMap<String, State>>,
353    local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec<u8>>,
354    subscribers: SubscriberSetWithQueue<(), EphemeralSubscriber, EphemeralStoreEvent>,
355    timeout: AtomicI64,
356}
357
358impl std::fmt::Debug for EphemeralStoreInner {
359    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360        write!(
361            f,
362            "AwarenessV2 {{ states: {:?}, timeout: {:?} }}",
363            self.states, self.timeout
364        )
365    }
366}
367
368#[derive(Serialize, Deserialize)]
369struct EncodedState<'a> {
370    #[serde(borrow)]
371    key: &'a str,
372    value: Option<LoroValue>,
373    timestamp: i64,
374}
375
376#[derive(Deserialize)]
377struct DecodedState<'a> {
378    #[serde(borrow)]
379    key: &'a str,
380    #[serde(deserialize_with = "deserialize_optional_depth_limited_loro_value")]
381    value: Option<LoroValue>,
382    timestamp: i64,
383}
384
385const EPHEMERAL_VALUE_MAX_DEPTH: usize = 512;
386
387fn deserialize_optional_depth_limited_loro_value<'de, D>(
388    deserializer: D,
389) -> Result<Option<LoroValue>, D::Error>
390where
391    D: Deserializer<'de>,
392{
393    Option::<DepthLimitedLoroValue>::deserialize(deserializer)
394        .map(|value| value.map(DepthLimitedLoroValue::into_inner))
395}
396
397fn deserialize_depth_limited_loro_value<'de, D>(deserializer: D) -> Result<LoroValue, D::Error>
398where
399    D: Deserializer<'de>,
400{
401    LoroValueSeed {
402        remaining_depth: EPHEMERAL_VALUE_MAX_DEPTH,
403    }
404    .deserialize(deserializer)
405}
406
407struct DepthLimitedLoroValue(LoroValue);
408
409impl DepthLimitedLoroValue {
410    fn into_inner(self) -> LoroValue {
411        self.0
412    }
413}
414
415impl<'de> Deserialize<'de> for DepthLimitedLoroValue {
416    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
417    where
418        D: Deserializer<'de>,
419    {
420        deserialize_depth_limited_loro_value(deserializer).map(Self)
421    }
422}
423
424#[derive(Clone, Copy)]
425struct LoroValueSeed {
426    remaining_depth: usize,
427}
428
429impl LoroValueSeed {
430    fn nested<E>(self) -> Result<Self, E>
431    where
432        E: serde::de::Error,
433    {
434        let remaining_depth = self
435            .remaining_depth
436            .checked_sub(1)
437            .ok_or_else(|| E::custom("LoroValue nesting depth exceeded"))?;
438        Ok(Self { remaining_depth })
439    }
440}
441
442impl<'de> DeserializeSeed<'de> for LoroValueSeed {
443    type Value = LoroValue;
444
445    fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
446    where
447        D: Deserializer<'de>,
448    {
449        if deserializer.is_human_readable() {
450            return Err(serde::de::Error::custom(
451                "human-readable LoroValue is not supported by the awareness decoder",
452            ));
453        }
454
455        deserializer.deserialize_enum(
456            "LoroValue",
457            &[
458                "Null",
459                "Bool",
460                "Double",
461                "I32",
462                "String",
463                "List",
464                "Map",
465                "Container",
466                "Binary",
467            ],
468            LoroValueSeedVisitor { seed: self },
469        )
470    }
471}
472
473struct LoroValueSeedVisitor {
474    seed: LoroValueSeed,
475}
476
477impl<'de> serde::de::Visitor<'de> for LoroValueSeedVisitor {
478    type Value = LoroValue;
479
480    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
481        formatter.write_str("a depth-limited LoroValue")
482    }
483
484    fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
485    where
486        A: EnumAccess<'de>,
487    {
488        match data.variant()? {
489            (DepthLimitedLoroValueField::Null, v) => {
490                v.unit_variant()?;
491                Ok(LoroValue::Null)
492            }
493            (DepthLimitedLoroValueField::Bool, v) => v.newtype_variant().map(LoroValue::Bool),
494            (DepthLimitedLoroValueField::Double, v) => v.newtype_variant().map(LoroValue::Double),
495            (DepthLimitedLoroValueField::I32, v) => v.newtype_variant().map(LoroValue::I64),
496            (DepthLimitedLoroValueField::String, v) => v
497                .newtype_variant()
498                .map(|value: String| LoroValue::String(value.into())),
499            (DepthLimitedLoroValueField::List, v) => v
500                .newtype_variant_seed(LoroListSeed {
501                    value_seed: self.seed.nested()?,
502                })
503                .map(|value| LoroValue::List(value.into())),
504            (DepthLimitedLoroValueField::Map, v) => v
505                .newtype_variant_seed(LoroMapSeed {
506                    value_seed: self.seed.nested()?,
507                })
508                .map(|value| LoroValue::Map(value.into())),
509            (DepthLimitedLoroValueField::Container, v) => {
510                v.newtype_variant().map(LoroValue::Container)
511            }
512            (DepthLimitedLoroValueField::Binary, v) => v
513                .newtype_variant()
514                .map(|value: Vec<u8>| LoroValue::Binary(value.into())),
515        }
516    }
517}
518
519#[derive(Deserialize)]
520enum DepthLimitedLoroValueField {
521    Null,
522    Bool,
523    Double,
524    I32,
525    String,
526    List,
527    Map,
528    Container,
529    Binary,
530}
531
532struct LoroListSeed {
533    value_seed: LoroValueSeed,
534}
535
536impl<'de> DeserializeSeed<'de> for LoroListSeed {
537    type Value = Vec<LoroValue>;
538
539    fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
540    where
541        D: Deserializer<'de>,
542    {
543        deserializer.deserialize_seq(LoroListVisitor {
544            value_seed: self.value_seed,
545        })
546    }
547}
548
549struct LoroListVisitor {
550    value_seed: LoroValueSeed,
551}
552
553impl<'de> serde::de::Visitor<'de> for LoroListVisitor {
554    type Value = Vec<LoroValue>;
555
556    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
557        formatter.write_str("a depth-limited LoroValue list")
558    }
559
560    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
561    where
562        A: SeqAccess<'de>,
563    {
564        let mut list = Vec::new();
565        while let Some(value) = seq.next_element_seed(self.value_seed)? {
566            list.push(value);
567        }
568        Ok(list)
569    }
570}
571
572struct LoroMapSeed {
573    value_seed: LoroValueSeed,
574}
575
576impl<'de> DeserializeSeed<'de> for LoroMapSeed {
577    type Value = FxHashMap<String, LoroValue>;
578
579    fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
580    where
581        D: Deserializer<'de>,
582    {
583        deserializer.deserialize_map(LoroMapVisitor {
584            value_seed: self.value_seed,
585        })
586    }
587}
588
589struct LoroMapVisitor {
590    value_seed: LoroValueSeed,
591}
592
593impl<'de> serde::de::Visitor<'de> for LoroMapVisitor {
594    type Value = FxHashMap<String, LoroValue>;
595
596    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
597        formatter.write_str("a depth-limited LoroValue map")
598    }
599
600    fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
601    where
602        A: MapAccess<'de>,
603    {
604        let mut value = FxHashMap::default();
605        while let Some(key) = map.next_key()? {
606            let entry = map.next_value_seed(self.value_seed)?;
607            value.insert(key, entry);
608        }
609        Ok(value)
610    }
611}
612
613#[derive(Debug, Clone)]
614struct State {
615    state: Option<LoroValue>,
616    timestamp: i64,
617}
618
619impl EphemeralStoreInner {
620    pub fn new(timeout: i64) -> EphemeralStoreInner {
621        EphemeralStoreInner {
622            timeout: AtomicI64::new(timeout),
623            states: Mutex::new(FxHashMap::default()),
624            local_subs: SubscriberSetWithQueue::new(),
625            subscribers: SubscriberSetWithQueue::new(),
626        }
627    }
628
629    pub fn encode(&self, key: &str) -> Vec<u8> {
630        let mut peers_info = Vec::new();
631        let now = get_sys_timestamp() as Timestamp;
632        let states = self.states.lock();
633        if let Some(peer_state) = states.get(key) {
634            if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
635            {
636                return vec![];
637            }
638            let encoded_peer_info = EncodedState {
639                key,
640                value: peer_state.state.clone(),
641                timestamp: peer_state.timestamp,
642            };
643            peers_info.push(encoded_peer_info);
644        }
645
646        postcard::to_allocvec(&peers_info).unwrap()
647    }
648
649    pub fn encode_all(&self) -> Vec<u8> {
650        let mut peers_info = Vec::new();
651        let now = get_sys_timestamp() as Timestamp;
652        let states = self.states.lock();
653        for (key, peer_state) in states.iter() {
654            if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
655            {
656                continue;
657            }
658            let encoded_peer_info = EncodedState {
659                key,
660                value: peer_state.state.clone(),
661                timestamp: peer_state.timestamp,
662            };
663            peers_info.push(encoded_peer_info);
664        }
665        postcard::to_allocvec(&peers_info).unwrap()
666    }
667
668    pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
669        let peers_info = match postcard::from_bytes::<Vec<DecodedState>>(data) {
670            Ok(ans) => ans,
671            Err(err) => return Err(format!("Failed to decode data: {}", err).into()),
672        };
673
674        let mut updated_keys = Vec::new();
675        let mut added_keys = Vec::new();
676        let mut removed_keys = Vec::new();
677        let now = get_sys_timestamp() as Timestamp;
678        let timeout = self.timeout.load(std::sync::atomic::Ordering::Relaxed);
679        let mut states = self.states.lock();
680        for DecodedState {
681            key,
682            value: record,
683            timestamp,
684        } in peers_info
685        {
686            if now - timestamp > timeout {
687                continue;
688            }
689
690            match states.get_mut(key) {
691                Some(peer_info) if peer_info.timestamp >= timestamp => {
692                    // do nothing
693                }
694                _ => {
695                    let old = states.insert(
696                        key.to_string(),
697                        State {
698                            state: record.clone(),
699                            timestamp,
700                        },
701                    );
702                    match (old, record) {
703                        (Some(_), Some(_)) => updated_keys.push(key.to_string()),
704                        (None, Some(_)) => added_keys.push(key.to_string()),
705                        (Some(_), None) => removed_keys.push(key.to_string()),
706                        (None, None) => {}
707                    }
708                }
709            }
710        }
711
712        drop(states);
713        if !self.subscribers.inner().is_empty() {
714            self.subscribers.emit(
715                &(),
716                EphemeralStoreEvent {
717                    by: EphemeralEventTrigger::Import,
718                    added: Arc::new(added_keys),
719                    updated: Arc::new(updated_keys),
720                    removed: Arc::new(removed_keys),
721                },
722            );
723        }
724
725        Ok(())
726    }
727
728    pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
729        self._set_local_state(key, Some(value.into()));
730    }
731
732    pub fn delete(&self, key: &str) {
733        self._set_local_state(key, None);
734    }
735
736    pub fn get(&self, key: &str) -> Option<LoroValue> {
737        let states = self.states.lock();
738        states.get(key).and_then(|x| x.state.clone())
739    }
740
741    pub fn remove_outdated(&self) {
742        let now = get_sys_timestamp() as Timestamp;
743        let mut removed = Vec::new();
744        let mut states = self.states.lock();
745        states.retain(|key, state| {
746            if now - state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed) {
747                if state.state.is_some() {
748                    removed.push(key.clone());
749                }
750                false
751            } else {
752                true
753            }
754        });
755        drop(states);
756        if !self.subscribers.inner().is_empty() {
757            self.subscribers.emit(
758                &(),
759                EphemeralStoreEvent {
760                    by: EphemeralEventTrigger::Timeout,
761                    added: Arc::new(Vec::new()),
762                    updated: Arc::new(Vec::new()),
763                    removed: Arc::new(removed),
764                },
765            );
766        }
767    }
768
769    pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
770        let states = self.states.lock();
771        states
772            .iter()
773            .filter(|(_, v)| v.state.is_some())
774            .map(|(k, v)| (k.clone(), v.state.clone().unwrap()))
775            .collect()
776    }
777
778    pub fn keys(&self) -> Vec<String> {
779        let states = self.states.lock();
780        states
781            .keys()
782            .filter(|&k| states.get(k).unwrap().state.is_some())
783            .map(|s| s.to_string())
784            .collect()
785    }
786
787    /// Subscribe to local ephemeral state updates.
788    ///
789    /// The callback receives encoded update bytes whenever the local peer's ephemeral state
790    /// is modified. This enables real-time synchronization of awareness data like cursor
791    /// positions, user presence, or temporary collaborative state.
792    ///
793    /// **Auto-unsubscription**: If the callback returns `false`, the subscription will be
794    /// automatically removed, making it easy to implement conditional or temporary subscriptions.
795    ///
796    /// # Parameters
797    /// - `callback`: Function that receives `&Vec<u8>` and returns `bool`
798    ///   - Return `true` to maintain the subscription
799    ///   - Return `false` to automatically unsubscribe
800    ///
801    /// # Returns
802    /// A `Subscription` that can be used to manually unsubscribe if needed.
803    pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
804        let (sub, activate) = self.local_subs.inner().insert((), callback);
805        activate();
806        sub
807    }
808
809    pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
810        let (sub, activate) = self.subscribers.inner().insert((), callback);
811        activate();
812        sub
813    }
814
815    fn _set_local_state(&self, key: &str, value: Option<LoroValue>) {
816        let is_delete = value.is_none();
817        let mut states = self.states.lock();
818        let old = states.insert(
819            key.to_string(),
820            State {
821                state: value,
822                timestamp: get_sys_timestamp() as Timestamp,
823            },
824        );
825
826        drop(states);
827        if !self.local_subs.inner().is_empty() {
828            self.local_subs.emit(&(), self.encode(key));
829        }
830        if !self.subscribers.inner().is_empty() {
831            if old.is_some() {
832                self.subscribers.emit(
833                    &(),
834                    EphemeralStoreEvent {
835                        by: EphemeralEventTrigger::Local,
836                        added: Arc::new(Vec::new()),
837                        updated: if !is_delete {
838                            Arc::new(vec![key.to_string()])
839                        } else {
840                            Arc::new(Vec::new())
841                        },
842                        removed: if !is_delete {
843                            Arc::new(Vec::new())
844                        } else {
845                            Arc::new(vec![key.to_string()])
846                        },
847                    },
848                );
849            } else if !is_delete {
850                self.subscribers.emit(
851                    &(),
852                    EphemeralStoreEvent {
853                        by: EphemeralEventTrigger::Local,
854                        added: Arc::new(vec![key.to_string()]),
855                        updated: Arc::new(Vec::new()),
856                        removed: Arc::new(Vec::new()),
857                    },
858                );
859            }
860        }
861    }
862}