loro_internal/
awareness.rs

1use std::sync::atomic::AtomicI64;
2use std::sync::{Arc, Mutex};
3
4use rustc_hash::FxHashMap;
5use loro_common::{LoroValue, PeerID};
6use serde::{Deserialize, Serialize};
7
8use crate::change::{get_sys_timestamp, Timestamp};
9use crate::{SubscriberSetWithQueue, Subscription};
10
11/// `Awareness` is a structure that tracks the ephemeral state of peers.
12///
13/// It can be used to synchronize cursor positions, selections, and the names of the peers.
14///
15/// The state of a specific peer is expected to be removed after a specified timeout. Use
16/// `remove_outdated` to eliminate outdated states.
17#[derive(Debug, Clone)]
18#[deprecated(since = "1.4.6", note = "Use `EphemeralStore` instead.")]
19pub struct Awareness {
20    peer: PeerID,
21    peers: FxHashMap<PeerID, PeerInfo>,
22    timeout: i64,
23}
24
25#[derive(Debug, Clone)]
26pub struct PeerInfo {
27    pub state: LoroValue,
28    pub counter: i32,
29    // This field is generated locally
30    pub timestamp: i64,
31}
32
33#[derive(Serialize, Deserialize)]
34struct EncodedPeerInfo {
35    peer: PeerID,
36    counter: i32,
37    record: LoroValue,
38}
39
40#[allow(deprecated)]
41impl Awareness {
42    pub fn new(peer: PeerID, timeout: i64) -> Awareness {
43        Awareness {
44            peer,
45            timeout,
46            peers: FxHashMap::default(),
47        }
48    }
49
50    pub fn encode(&self, peers: &[PeerID]) -> Vec<u8> {
51        let mut peers_info = Vec::new();
52        let now = get_sys_timestamp() as Timestamp;
53        for peer in peers {
54            if let Some(peer_info) = self.peers.get(peer) {
55                if now - peer_info.timestamp > self.timeout {
56                    continue;
57                }
58
59                let encoded_peer_info = EncodedPeerInfo {
60                    peer: *peer,
61                    record: peer_info.state.clone(),
62                    counter: peer_info.counter,
63                };
64                peers_info.push(encoded_peer_info);
65            }
66        }
67
68        postcard::to_allocvec(&peers_info).unwrap()
69    }
70
71    pub fn encode_all(&self) -> Vec<u8> {
72        let mut peers_info = Vec::new();
73        let now = get_sys_timestamp() as Timestamp;
74        for (peer, peer_info) in self.peers.iter() {
75            if now - peer_info.timestamp > self.timeout {
76                continue;
77            }
78
79            let encoded_peer_info = EncodedPeerInfo {
80                peer: *peer,
81                record: peer_info.state.clone(),
82                counter: peer_info.counter,
83            };
84            peers_info.push(encoded_peer_info);
85        }
86
87        postcard::to_allocvec(&peers_info).unwrap()
88    }
89
90    /// Returns (updated, added)
91    pub fn apply(&mut self, encoded_peers_info: &[u8]) -> (Vec<PeerID>, Vec<PeerID>) {
92        let peers_info: Vec<EncodedPeerInfo> = postcard::from_bytes(encoded_peers_info).unwrap();
93        let mut changed_peers = Vec::new();
94        let mut added_peers = Vec::new();
95        let now = get_sys_timestamp() as Timestamp;
96        for peer_info in peers_info {
97            match self.peers.get(&peer_info.peer) {
98                Some(x) if x.counter >= peer_info.counter || peer_info.peer == self.peer => {
99                    // do nothing
100                }
101                _ => {
102                    let old = self.peers.insert(
103                        peer_info.peer,
104                        PeerInfo {
105                            counter: peer_info.counter,
106                            state: peer_info.record,
107                            timestamp: now,
108                        },
109                    );
110                    if old.is_some() {
111                        changed_peers.push(peer_info.peer);
112                    } else {
113                        added_peers.push(peer_info.peer);
114                    }
115                }
116            }
117        }
118
119        (changed_peers, added_peers)
120    }
121
122    pub fn set_local_state(&mut self, value: impl Into<LoroValue>) {
123        self._set_local_state(value.into());
124    }
125
126    fn _set_local_state(&mut self, value: LoroValue) {
127        let peer = self.peers.entry(self.peer).or_insert_with(|| PeerInfo {
128            state: Default::default(),
129            counter: 0,
130            timestamp: 0,
131        });
132
133        peer.state = value;
134        peer.counter += 1;
135        peer.timestamp = get_sys_timestamp() as Timestamp;
136    }
137
138    pub fn get_local_state(&self) -> Option<LoroValue> {
139        self.peers.get(&self.peer).map(|x| x.state.clone())
140    }
141
142    pub fn remove_outdated(&mut self) -> Vec<PeerID> {
143        let now = get_sys_timestamp() as Timestamp;
144        let mut removed = Vec::new();
145        self.peers.retain(|id, v| {
146            if now - v.timestamp > self.timeout {
147                removed.push(*id);
148                false
149            } else {
150                true
151            }
152        });
153
154        removed
155    }
156
157    pub fn get_all_states(&self) -> &FxHashMap<PeerID, PeerInfo> {
158        &self.peers
159    }
160
161    pub fn peer(&self) -> PeerID {
162        self.peer
163    }
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum EphemeralEventTrigger {
168    Local,
169    Import,
170    Timeout,
171}
172
173#[derive(Debug, Clone)]
174pub struct EphemeralStoreEvent {
175    pub by: EphemeralEventTrigger,
176    pub added: Arc<Vec<String>>,
177    pub updated: Arc<Vec<String>>,
178    pub removed: Arc<Vec<String>>,
179}
180
181pub type LocalEphemeralCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
182pub type EphemeralSubscriber = Box<dyn Fn(&EphemeralStoreEvent) -> bool + Send + Sync + 'static>;
183
184/// `EphemeralStore` is a structure that tracks ephemeral key-value state across peers.
185///
186/// - Use it for syncing lightweight presence/state like cursors, selections, and UI hints.
187/// - Each key uses timestamp-based LWW (Last-Write-Wins) conflict resolution.
188/// - Timeout unit: milliseconds.
189/// - After timeout: entries are considered expired. They are omitted from
190///   `encode`/`encode_all`, and calling [`remove_outdated`] will purge them and
191///   notify subscribers with `EphemeralEventTrigger::Timeout`.
192///
193/// In Rust, you are responsible for periodically calling [`remove_outdated`]
194/// if you want timed-out entries to be removed and corresponding events to be
195/// emitted. In the WASM/TS wrapper, a timer runs automatically while the store
196/// is non-empty.
197///
198/// See: https://loro.dev/docs/tutorial/ephemeral
199///
200/// # Example
201///
202/// ```rust
203/// use loro_internal::awareness::EphemeralStore;
204///
205/// let mut store = EphemeralStore::new(1000);
206/// store.set("key", "value");
207/// let encoded = store.encode("key");
208/// let mut store2 = EphemeralStore::new(1000);
209/// store.subscribe_local_updates(Box::new(|data| {
210///     println!("local update: {:?}", data);
211///     true
212/// }));
213/// store2.apply(&encoded);
214/// assert_eq!(store2.get("key"), Some("value".into()));
215/// ```
216#[derive(Debug, Clone)]
217pub struct EphemeralStore {
218    inner: Arc<EphemeralStoreInner>,
219}
220
221impl EphemeralStore {
222    /// Create a new `EphemeralStore`.
223    ///
224    /// - `timeout`: inactivity timeout in milliseconds. If a key does not
225    ///   receive updates within this duration, it is considered expired. It
226    ///   will be skipped by `encode`/`encode_all`, and removed when
227    ///   [`remove_outdated`] is called (triggering a `Timeout` event to
228    ///   subscribers).
229    pub fn new(timeout: i64) -> Self {
230        Self {
231            inner: Arc::new(EphemeralStoreInner::new(timeout)),
232        }
233    }
234
235    /// Encode the latest value of `key`.
236    ///
237    /// Expired keys (past timeout) are omitted and produce an empty payload.
238    pub fn encode(&self, key: &str) -> Vec<u8> {
239        self.inner.encode(key)
240    }
241
242    /// Encode all non-expired keys.
243    ///
244    /// Entries that have exceeded the timeout are not included in the payload.
245    pub fn encode_all(&self) -> Vec<u8> {
246        self.inner.encode_all()
247    }
248    
249    /// Apply encoded updates imported from another peer/process.
250    ///
251    /// Subscribers receive an event with `by = Import` and the lists of
252    /// `added`/`updated`/`removed` keys as appropriate.
253    pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
254        self.inner.apply(data)
255    }
256
257    pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
258        self.inner.set(key, value)
259    }
260
261    pub fn delete(&self, key: &str) {
262        self.inner.delete(key)
263    }
264
265    pub fn get(&self, key: &str) -> Option<LoroValue> {
266        self.inner.get(key)
267    }
268
269    /// Remove entries whose last update timestamp has exceeded the timeout.
270    ///
271    /// If any keys are removed, subscribers receive an event with
272    /// `by = Timeout` and the `removed` keys. This does not run automatically
273    /// in Rust; call it on your own schedule.
274    pub fn remove_outdated(&self) {
275        self.inner.remove_outdated()
276    }
277
278    pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
279        self.inner.get_all_states()
280    }
281
282    pub fn keys(&self) -> Vec<String> {
283        self.inner.keys()
284    }
285
286    /// Subscribe to local ephemeral/awareness updates.
287    ///
288    /// The callback receives encoded update bytes whenever local ephemeral state changes.
289    /// This is useful for syncing awareness information like cursor positions, selections,
290    /// or presence data to other peers in real-time.
291    ///
292    /// **Auto-unsubscription**: If the callback returns `false`, the subscription will be
293    /// automatically removed, providing a convenient way to implement one-time or conditional
294    /// subscriptions in Rust.
295    ///
296    /// # Parameters
297    /// - `callback`: Function that receives `&Vec<u8>` (encoded ephemeral updates) and returns `bool`
298    ///   - Return `true` to keep the subscription active
299    ///   - Return `false` to automatically unsubscribe
300    ///
301    /// # Example
302    /// ```rust
303    /// use loro_internal::awareness::EphemeralStore;
304    /// use std::sync::{Arc, Mutex};
305    ///
306    /// let store = EphemeralStore::new(30000);
307    /// let update_count = Arc::new(Mutex::new(0));
308    /// let count_clone = update_count.clone();
309    ///
310    /// // Subscribe and collect first 3 updates, then auto-unsubscribe
311    /// let sub = store.subscribe_local_updates(Box::new(move |bytes| {
312    ///     println!("Received {} bytes of ephemeral data", bytes.len());
313    ///     let mut count = count_clone.lock().unwrap();
314    ///     *count += 1;
315    ///     *count < 3  // Auto-unsubscribe after 3 updates
316    /// }));
317    ///
318    /// store.set("cursor", 42);
319    /// ```
320    pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
321        self.inner.subscribe_local_updates(callback)
322    }
323
324    pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
325        self.inner.subscribe(callback)
326    }
327}
328
329struct EphemeralStoreInner {
330    states: Mutex<FxHashMap<String, State>>,
331    local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec<u8>>,
332    subscribers: SubscriberSetWithQueue<(), EphemeralSubscriber, EphemeralStoreEvent>,
333    timeout: AtomicI64,
334}
335
336impl std::fmt::Debug for EphemeralStoreInner {
337    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338        write!(
339            f,
340            "AwarenessV2 {{ states: {:?}, timeout: {:?} }}",
341            self.states, self.timeout
342        )
343    }
344}
345
346#[derive(Serialize, Deserialize)]
347struct EncodedState<'a> {
348    #[serde(borrow)]
349    key: &'a str,
350    value: Option<LoroValue>,
351    timestamp: i64,
352}
353
354#[derive(Debug, Clone)]
355struct State {
356    state: Option<LoroValue>,
357    timestamp: i64,
358}
359
360impl EphemeralStoreInner {
361    pub fn new(timeout: i64) -> EphemeralStoreInner {
362        EphemeralStoreInner {
363            timeout: AtomicI64::new(timeout),
364            states: Mutex::new(FxHashMap::default()),
365            local_subs: SubscriberSetWithQueue::new(),
366            subscribers: SubscriberSetWithQueue::new(),
367        }
368    }
369
370    pub fn encode(&self, key: &str) -> Vec<u8> {
371        let mut peers_info = Vec::new();
372        let now = get_sys_timestamp() as Timestamp;
373        let states = self.states.lock().unwrap();
374        if let Some(peer_state) = states.get(key) {
375            if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
376            {
377                return vec![];
378            }
379            let encoded_peer_info = EncodedState {
380                key,
381                value: peer_state.state.clone(),
382                timestamp: peer_state.timestamp,
383            };
384            peers_info.push(encoded_peer_info);
385        }
386
387        postcard::to_allocvec(&peers_info).unwrap()
388    }
389
390    pub fn encode_all(&self) -> Vec<u8> {
391        let mut peers_info = Vec::new();
392        let now = get_sys_timestamp() as Timestamp;
393        let states = self.states.lock().unwrap();
394        for (key, peer_state) in states.iter() {
395            if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
396            {
397                continue;
398            }
399            let encoded_peer_info = EncodedState {
400                key,
401                value: peer_state.state.clone(),
402                timestamp: peer_state.timestamp,
403            };
404            peers_info.push(encoded_peer_info);
405        }
406        postcard::to_allocvec(&peers_info).unwrap()
407    }
408
409    pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
410        let peers_info = match postcard::from_bytes::<Vec<EncodedState>>(data) {
411            Ok(ans) => ans,
412            Err(err) => return Err(format!("Failed to decode data: {}", err).into()),
413        };
414
415        let mut updated_keys = Vec::new();
416        let mut added_keys = Vec::new();
417        let mut removed_keys = Vec::new();
418        let now = get_sys_timestamp() as Timestamp;
419        let mut states = self.states.lock().unwrap();
420        for EncodedState {
421            key,
422            value: record,
423            timestamp,
424        } in peers_info
425        {
426            match states.get_mut(key) {
427                Some(peer_info) if peer_info.timestamp >= timestamp => {
428                    // do nothing
429                }
430                _ => {
431                    let old = states.insert(
432                        key.to_string(),
433                        State {
434                            state: record.clone(),
435                            timestamp: now,
436                        },
437                    );
438                    match (old, record) {
439                        (Some(_), Some(_)) => updated_keys.push(key.to_string()),
440                        (None, Some(_)) => added_keys.push(key.to_string()),
441                        (Some(_), None) => removed_keys.push(key.to_string()),
442                        (None, None) => {}
443                    }
444                }
445            }
446        }
447
448        drop(states);
449        if !self.subscribers.inner().is_empty() {
450            self.subscribers.emit(
451                &(),
452                EphemeralStoreEvent {
453                    by: EphemeralEventTrigger::Import,
454                    added: Arc::new(added_keys),
455                    updated: Arc::new(updated_keys),
456                    removed: Arc::new(removed_keys),
457                },
458            );
459        }
460
461        Ok(())
462    }
463
464    pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
465        self._set_local_state(key, Some(value.into()));
466    }
467
468    pub fn delete(&self, key: &str) {
469        self._set_local_state(key, None);
470    }
471
472    pub fn get(&self, key: &str) -> Option<LoroValue> {
473        let states = self.states.lock().unwrap();
474        states.get(key).and_then(|x| x.state.clone())
475    }
476
477    pub fn remove_outdated(&self) {
478        let now = get_sys_timestamp() as Timestamp;
479        let mut removed = Vec::new();
480        let mut states = self.states.lock().unwrap();
481        states.retain(|key, state| {
482            if now - state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed) {
483                if state.state.is_some() {
484                    removed.push(key.clone());
485                }
486                false
487            } else {
488                true
489            }
490        });
491        drop(states);
492        if !self.subscribers.inner().is_empty() {
493            self.subscribers.emit(
494                &(),
495                EphemeralStoreEvent {
496                    by: EphemeralEventTrigger::Timeout,
497                    added: Arc::new(Vec::new()),
498                    updated: Arc::new(Vec::new()),
499                    removed: Arc::new(removed),
500                },
501            );
502        }
503    }
504
505    pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
506        let states = self.states.lock().unwrap();
507        states
508            .iter()
509            .filter(|(_, v)| v.state.is_some())
510            .map(|(k, v)| (k.clone(), v.state.clone().unwrap()))
511            .collect()
512    }
513
514    pub fn keys(&self) -> Vec<String> {
515        let states = self.states.lock().unwrap();
516        states
517            .keys()
518            .filter(|&k| states.get(k).unwrap().state.is_some())
519            .map(|s| s.to_string())
520            .collect()
521    }
522
523    /// Subscribe to local ephemeral state updates.
524    ///
525    /// The callback receives encoded update bytes whenever the local peer's ephemeral state
526    /// is modified. This enables real-time synchronization of awareness data like cursor
527    /// positions, user presence, or temporary collaborative state.
528    ///
529    /// **Auto-unsubscription**: If the callback returns `false`, the subscription will be
530    /// automatically removed, making it easy to implement conditional or temporary subscriptions.
531    ///
532    /// # Parameters
533    /// - `callback`: Function that receives `&Vec<u8>` and returns `bool`
534    ///   - Return `true` to maintain the subscription
535    ///   - Return `false` to automatically unsubscribe
536    ///
537    /// # Returns
538    /// A `Subscription` that can be used to manually unsubscribe if needed.
539    pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
540        let (sub, activate) = self.local_subs.inner().insert((), callback);
541        activate();
542        sub
543    }
544
545    pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
546        let (sub, activate) = self.subscribers.inner().insert((), callback);
547        activate();
548        sub
549    }
550
551    fn _set_local_state(&self, key: &str, value: Option<LoroValue>) {
552        let is_delete = value.is_none();
553        let mut states = self.states.lock().unwrap();
554        let old = states.insert(
555            key.to_string(),
556            State {
557                state: value,
558                timestamp: get_sys_timestamp() as Timestamp,
559            },
560        );
561
562        drop(states);
563        if !self.local_subs.inner().is_empty() {
564            self.local_subs.emit(&(), self.encode(key));
565        }
566        if !self.subscribers.inner().is_empty() {
567            if old.is_some() {
568                self.subscribers.emit(
569                    &(),
570                    EphemeralStoreEvent {
571                        by: EphemeralEventTrigger::Local,
572                        added: Arc::new(Vec::new()),
573                        updated: if !is_delete {
574                            Arc::new(vec![key.to_string()])
575                        } else {
576                            Arc::new(Vec::new())
577                        },
578                        removed: if !is_delete {
579                            Arc::new(Vec::new())
580                        } else {
581                            Arc::new(vec![key.to_string()])
582                        },
583                    },
584                );
585            } else if !is_delete {
586                self.subscribers.emit(
587                    &(),
588                    EphemeralStoreEvent {
589                        by: EphemeralEventTrigger::Local,
590                        added: Arc::new(vec![key.to_string()]),
591                        updated: Arc::new(Vec::new()),
592                        removed: Arc::new(Vec::new()),
593                    },
594                );
595            }
596        }
597    }
598}