loro_internal/
awareness.rs

1//! Ephemeral presence utilities.
2//!
3//! `EphemeralStore` is the recommended API: a timestamped, last-write-wins key-value
4//! store for transient presence data (cursors, selections, etc.). It supports:
5//! - Per-key timeouts: expired entries are skipped by `encode`/`encode_all` and removed
6//!   by `remove_outdated`.
7//! - Import/export via `encode`/`apply` for syncing between peers.
8//! - Subscriptions for both local updates (raw bytes to send) and merged updates.
9//!
10//! The legacy `Awareness` type remains for backward compatibility but is deprecated in
11//! favor of `EphemeralStore`.
12use std::sync::atomic::AtomicI64;
13use std::sync::{Arc, Mutex};
14
15use loro_common::{LoroValue, PeerID};
16use rustc_hash::FxHashMap;
17use serde::{Deserialize, Serialize};
18
19use crate::change::{get_sys_timestamp, Timestamp};
20use crate::{SubscriberSetWithQueue, Subscription};
21
22/// `Awareness` is a structure that tracks the ephemeral state of peers.
23///
24/// It can be used to synchronize cursor positions, selections, and the names of the peers.
25///
26/// The state of a specific peer is expected to be removed after a specified timeout. Use
27/// `remove_outdated` to eliminate outdated states.
28#[derive(Debug, Clone)]
29#[deprecated(since = "1.4.6", note = "Use `EphemeralStore` instead.")]
30pub struct Awareness {
31    peer: PeerID,
32    peers: FxHashMap<PeerID, PeerInfo>,
33    timeout: i64,
34}
35
36#[derive(Debug, Clone)]
37pub struct PeerInfo {
38    pub state: LoroValue,
39    pub counter: i32,
40    // This field is generated locally
41    pub timestamp: i64,
42}
43
44#[derive(Serialize, Deserialize)]
45struct EncodedPeerInfo {
46    peer: PeerID,
47    counter: i32,
48    record: LoroValue,
49}
50
51#[allow(deprecated)]
52impl Awareness {
53    pub fn new(peer: PeerID, timeout: i64) -> Awareness {
54        Awareness {
55            peer,
56            timeout,
57            peers: FxHashMap::default(),
58        }
59    }
60
61    pub fn encode(&self, peers: &[PeerID]) -> Vec<u8> {
62        let mut peers_info = Vec::new();
63        let now = get_sys_timestamp() as Timestamp;
64        for peer in peers {
65            if let Some(peer_info) = self.peers.get(peer) {
66                if now - peer_info.timestamp > self.timeout {
67                    continue;
68                }
69
70                let encoded_peer_info = EncodedPeerInfo {
71                    peer: *peer,
72                    record: peer_info.state.clone(),
73                    counter: peer_info.counter,
74                };
75                peers_info.push(encoded_peer_info);
76            }
77        }
78
79        postcard::to_allocvec(&peers_info).unwrap()
80    }
81
82    pub fn encode_all(&self) -> Vec<u8> {
83        let mut peers_info = Vec::new();
84        let now = get_sys_timestamp() as Timestamp;
85        for (peer, peer_info) in self.peers.iter() {
86            if now - peer_info.timestamp > self.timeout {
87                continue;
88            }
89
90            let encoded_peer_info = EncodedPeerInfo {
91                peer: *peer,
92                record: peer_info.state.clone(),
93                counter: peer_info.counter,
94            };
95            peers_info.push(encoded_peer_info);
96        }
97
98        postcard::to_allocvec(&peers_info).unwrap()
99    }
100
101    /// Returns (updated, added)
102    pub fn apply(&mut self, encoded_peers_info: &[u8]) -> (Vec<PeerID>, Vec<PeerID>) {
103        let peers_info: Vec<EncodedPeerInfo> = postcard::from_bytes(encoded_peers_info).unwrap();
104        let mut changed_peers = Vec::new();
105        let mut added_peers = Vec::new();
106        let now = get_sys_timestamp() as Timestamp;
107        for peer_info in peers_info {
108            match self.peers.get(&peer_info.peer) {
109                Some(x) if x.counter >= peer_info.counter || peer_info.peer == self.peer => {
110                    // do nothing
111                }
112                _ => {
113                    let old = self.peers.insert(
114                        peer_info.peer,
115                        PeerInfo {
116                            counter: peer_info.counter,
117                            state: peer_info.record,
118                            timestamp: now,
119                        },
120                    );
121                    if old.is_some() {
122                        changed_peers.push(peer_info.peer);
123                    } else {
124                        added_peers.push(peer_info.peer);
125                    }
126                }
127            }
128        }
129
130        (changed_peers, added_peers)
131    }
132
133    pub fn set_local_state(&mut self, value: impl Into<LoroValue>) {
134        self._set_local_state(value.into());
135    }
136
137    fn _set_local_state(&mut self, value: LoroValue) {
138        let peer = self.peers.entry(self.peer).or_insert_with(|| PeerInfo {
139            state: Default::default(),
140            counter: 0,
141            timestamp: 0,
142        });
143
144        peer.state = value;
145        peer.counter += 1;
146        peer.timestamp = get_sys_timestamp() as Timestamp;
147    }
148
149    pub fn get_local_state(&self) -> Option<LoroValue> {
150        self.peers.get(&self.peer).map(|x| x.state.clone())
151    }
152
153    pub fn remove_outdated(&mut self) -> Vec<PeerID> {
154        let now = get_sys_timestamp() as Timestamp;
155        let mut removed = Vec::new();
156        self.peers.retain(|id, v| {
157            if now - v.timestamp > self.timeout {
158                removed.push(*id);
159                false
160            } else {
161                true
162            }
163        });
164
165        removed
166    }
167
168    pub fn get_all_states(&self) -> &FxHashMap<PeerID, PeerInfo> {
169        &self.peers
170    }
171
172    pub fn peer(&self) -> PeerID {
173        self.peer
174    }
175}
176
177#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178pub enum EphemeralEventTrigger {
179    Local,
180    Import,
181    Timeout,
182}
183
184#[derive(Debug, Clone)]
185pub struct EphemeralStoreEvent {
186    pub by: EphemeralEventTrigger,
187    pub added: Arc<Vec<String>>,
188    pub updated: Arc<Vec<String>>,
189    pub removed: Arc<Vec<String>>,
190}
191
192pub type LocalEphemeralCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
193pub type EphemeralSubscriber = Box<dyn Fn(&EphemeralStoreEvent) -> bool + Send + Sync + 'static>;
194
195/// `EphemeralStore` is a structure that tracks ephemeral key-value state across peers.
196///
197/// - Use it for syncing lightweight presence/state like cursors, selections, and UI hints.
198/// - Each key uses timestamp-based LWW (Last-Write-Wins) conflict resolution.
199/// - Timeout unit: milliseconds.
200/// - After timeout: entries are considered expired. They are omitted from
201///   `encode`/`encode_all`, and calling [`remove_outdated`] will purge them and
202///   notify subscribers with `EphemeralEventTrigger::Timeout`.
203///
204/// In Rust, you are responsible for periodically calling [`remove_outdated`]
205/// if you want timed-out entries to be removed and corresponding events to be
206/// emitted. In the WASM/TS wrapper, a timer runs automatically while the store
207/// is non-empty.
208///
209/// See: https://loro.dev/docs/tutorial/ephemeral
210///
211/// # Example
212///
213/// ```rust
214/// use loro_internal::awareness::EphemeralStore;
215///
216/// let mut store = EphemeralStore::new(1000);
217/// store.set("key", "value");
218/// let encoded = store.encode("key");
219/// let mut store2 = EphemeralStore::new(1000);
220/// store.subscribe_local_updates(Box::new(|data| {
221///     println!("local update: {:?}", data);
222///     true
223/// }));
224/// store2.apply(&encoded);
225/// assert_eq!(store2.get("key"), Some("value".into()));
226/// ```
227#[derive(Debug, Clone)]
228pub struct EphemeralStore {
229    inner: Arc<EphemeralStoreInner>,
230}
231
232impl EphemeralStore {
233    /// Create a new `EphemeralStore`.
234    ///
235    /// - `timeout`: inactivity timeout in milliseconds. If a key does not
236    ///   receive updates within this duration, it is considered expired. It
237    ///   will be skipped by `encode`/`encode_all`, and removed when
238    ///   [`remove_outdated`] is called (triggering a `Timeout` event to
239    ///   subscribers).
240    pub fn new(timeout: i64) -> Self {
241        Self {
242            inner: Arc::new(EphemeralStoreInner::new(timeout)),
243        }
244    }
245
246    /// Encode the latest value of `key`.
247    ///
248    /// Expired keys (past timeout) are omitted and produce an empty payload.
249    pub fn encode(&self, key: &str) -> Vec<u8> {
250        self.inner.encode(key)
251    }
252
253    /// Encode all non-expired keys.
254    ///
255    /// Entries that have exceeded the timeout are not included in the payload.
256    pub fn encode_all(&self) -> Vec<u8> {
257        self.inner.encode_all()
258    }
259
260    /// Apply encoded updates imported from another peer/process.
261    ///
262    /// Subscribers receive an event with `by = Import` and the lists of
263    /// `added`/`updated`/`removed` keys as appropriate.
264    pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
265        self.inner.apply(data)
266    }
267
268    pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
269        self.inner.set(key, value)
270    }
271
272    pub fn delete(&self, key: &str) {
273        self.inner.delete(key)
274    }
275
276    pub fn get(&self, key: &str) -> Option<LoroValue> {
277        self.inner.get(key)
278    }
279
280    /// Remove entries whose last update timestamp has exceeded the timeout.
281    ///
282    /// If any keys are removed, subscribers receive an event with
283    /// `by = Timeout` and the `removed` keys. This does not run automatically
284    /// in Rust; call it on your own schedule.
285    pub fn remove_outdated(&self) {
286        self.inner.remove_outdated()
287    }
288
289    pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
290        self.inner.get_all_states()
291    }
292
293    pub fn keys(&self) -> Vec<String> {
294        self.inner.keys()
295    }
296
297    /// Subscribe to local ephemeral/awareness updates.
298    ///
299    /// The callback receives encoded update bytes whenever local ephemeral state changes.
300    /// This is useful for syncing awareness information like cursor positions, selections,
301    /// or presence data to other peers in real-time.
302    ///
303    /// **Auto-unsubscription**: If the callback returns `false`, the subscription will be
304    /// automatically removed, providing a convenient way to implement one-time or conditional
305    /// subscriptions in Rust.
306    ///
307    /// # Parameters
308    /// - `callback`: Function that receives `&Vec<u8>` (encoded ephemeral updates) and returns `bool`
309    ///   - Return `true` to keep the subscription active
310    ///   - Return `false` to automatically unsubscribe
311    ///
312    /// # Example
313    /// ```rust
314    /// use loro_internal::awareness::EphemeralStore;
315    /// use std::sync::{Arc, Mutex};
316    ///
317    /// let store = EphemeralStore::new(30000);
318    /// let update_count = Arc::new(Mutex::new(0));
319    /// let count_clone = update_count.clone();
320    ///
321    /// // Subscribe and collect first 3 updates, then auto-unsubscribe
322    /// let sub = store.subscribe_local_updates(Box::new(move |bytes| {
323    ///     println!("Received {} bytes of ephemeral data", bytes.len());
324    ///     let mut count = count_clone.lock().unwrap();
325    ///     *count += 1;
326    ///     *count < 3  // Auto-unsubscribe after 3 updates
327    /// }));
328    ///
329    /// store.set("cursor", 42);
330    /// ```
331    pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
332        self.inner.subscribe_local_updates(callback)
333    }
334
335    pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
336        self.inner.subscribe(callback)
337    }
338}
339
340struct EphemeralStoreInner {
341    states: Mutex<FxHashMap<String, State>>,
342    local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec<u8>>,
343    subscribers: SubscriberSetWithQueue<(), EphemeralSubscriber, EphemeralStoreEvent>,
344    timeout: AtomicI64,
345}
346
347impl std::fmt::Debug for EphemeralStoreInner {
348    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349        write!(
350            f,
351            "AwarenessV2 {{ states: {:?}, timeout: {:?} }}",
352            self.states, self.timeout
353        )
354    }
355}
356
357#[derive(Serialize, Deserialize)]
358struct EncodedState<'a> {
359    #[serde(borrow)]
360    key: &'a str,
361    value: Option<LoroValue>,
362    timestamp: i64,
363}
364
365#[derive(Debug, Clone)]
366struct State {
367    state: Option<LoroValue>,
368    timestamp: i64,
369}
370
371impl EphemeralStoreInner {
372    pub fn new(timeout: i64) -> EphemeralStoreInner {
373        EphemeralStoreInner {
374            timeout: AtomicI64::new(timeout),
375            states: Mutex::new(FxHashMap::default()),
376            local_subs: SubscriberSetWithQueue::new(),
377            subscribers: SubscriberSetWithQueue::new(),
378        }
379    }
380
381    pub fn encode(&self, key: &str) -> Vec<u8> {
382        let mut peers_info = Vec::new();
383        let now = get_sys_timestamp() as Timestamp;
384        let states = self.states.lock().unwrap();
385        if let Some(peer_state) = states.get(key) {
386            if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
387            {
388                return vec![];
389            }
390            let encoded_peer_info = EncodedState {
391                key,
392                value: peer_state.state.clone(),
393                timestamp: peer_state.timestamp,
394            };
395            peers_info.push(encoded_peer_info);
396        }
397
398        postcard::to_allocvec(&peers_info).unwrap()
399    }
400
401    pub fn encode_all(&self) -> Vec<u8> {
402        let mut peers_info = Vec::new();
403        let now = get_sys_timestamp() as Timestamp;
404        let states = self.states.lock().unwrap();
405        for (key, peer_state) in states.iter() {
406            if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
407            {
408                continue;
409            }
410            let encoded_peer_info = EncodedState {
411                key,
412                value: peer_state.state.clone(),
413                timestamp: peer_state.timestamp,
414            };
415            peers_info.push(encoded_peer_info);
416        }
417        postcard::to_allocvec(&peers_info).unwrap()
418    }
419
420    pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
421        let peers_info = match postcard::from_bytes::<Vec<EncodedState>>(data) {
422            Ok(ans) => ans,
423            Err(err) => return Err(format!("Failed to decode data: {}", err).into()),
424        };
425
426        let mut updated_keys = Vec::new();
427        let mut added_keys = Vec::new();
428        let mut removed_keys = Vec::new();
429        let now = get_sys_timestamp() as Timestamp;
430        let timeout = self.timeout.load(std::sync::atomic::Ordering::Relaxed);
431        let mut states = self.states.lock().unwrap();
432        for EncodedState {
433            key,
434            value: record,
435            timestamp,
436        } in peers_info
437        {
438            if now - timestamp > timeout {
439                continue;
440            }
441
442            match states.get_mut(key) {
443                Some(peer_info) if peer_info.timestamp >= timestamp => {
444                    // do nothing
445                }
446                _ => {
447                    let old = states.insert(
448                        key.to_string(),
449                        State {
450                            state: record.clone(),
451                            timestamp,
452                        },
453                    );
454                    match (old, record) {
455                        (Some(_), Some(_)) => updated_keys.push(key.to_string()),
456                        (None, Some(_)) => added_keys.push(key.to_string()),
457                        (Some(_), None) => removed_keys.push(key.to_string()),
458                        (None, None) => {}
459                    }
460                }
461            }
462        }
463
464        drop(states);
465        if !self.subscribers.inner().is_empty() {
466            self.subscribers.emit(
467                &(),
468                EphemeralStoreEvent {
469                    by: EphemeralEventTrigger::Import,
470                    added: Arc::new(added_keys),
471                    updated: Arc::new(updated_keys),
472                    removed: Arc::new(removed_keys),
473                },
474            );
475        }
476
477        Ok(())
478    }
479
480    pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
481        self._set_local_state(key, Some(value.into()));
482    }
483
484    pub fn delete(&self, key: &str) {
485        self._set_local_state(key, None);
486    }
487
488    pub fn get(&self, key: &str) -> Option<LoroValue> {
489        let states = self.states.lock().unwrap();
490        states.get(key).and_then(|x| x.state.clone())
491    }
492
493    pub fn remove_outdated(&self) {
494        let now = get_sys_timestamp() as Timestamp;
495        let mut removed = Vec::new();
496        let mut states = self.states.lock().unwrap();
497        states.retain(|key, state| {
498            if now - state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed) {
499                if state.state.is_some() {
500                    removed.push(key.clone());
501                }
502                false
503            } else {
504                true
505            }
506        });
507        drop(states);
508        if !self.subscribers.inner().is_empty() {
509            self.subscribers.emit(
510                &(),
511                EphemeralStoreEvent {
512                    by: EphemeralEventTrigger::Timeout,
513                    added: Arc::new(Vec::new()),
514                    updated: Arc::new(Vec::new()),
515                    removed: Arc::new(removed),
516                },
517            );
518        }
519    }
520
521    pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
522        let states = self.states.lock().unwrap();
523        states
524            .iter()
525            .filter(|(_, v)| v.state.is_some())
526            .map(|(k, v)| (k.clone(), v.state.clone().unwrap()))
527            .collect()
528    }
529
530    pub fn keys(&self) -> Vec<String> {
531        let states = self.states.lock().unwrap();
532        states
533            .keys()
534            .filter(|&k| states.get(k).unwrap().state.is_some())
535            .map(|s| s.to_string())
536            .collect()
537    }
538
539    /// Subscribe to local ephemeral state updates.
540    ///
541    /// The callback receives encoded update bytes whenever the local peer's ephemeral state
542    /// is modified. This enables real-time synchronization of awareness data like cursor
543    /// positions, user presence, or temporary collaborative state.
544    ///
545    /// **Auto-unsubscription**: If the callback returns `false`, the subscription will be
546    /// automatically removed, making it easy to implement conditional or temporary subscriptions.
547    ///
548    /// # Parameters
549    /// - `callback`: Function that receives `&Vec<u8>` and returns `bool`
550    ///   - Return `true` to maintain the subscription
551    ///   - Return `false` to automatically unsubscribe
552    ///
553    /// # Returns
554    /// A `Subscription` that can be used to manually unsubscribe if needed.
555    pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
556        let (sub, activate) = self.local_subs.inner().insert((), callback);
557        activate();
558        sub
559    }
560
561    pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
562        let (sub, activate) = self.subscribers.inner().insert((), callback);
563        activate();
564        sub
565    }
566
567    fn _set_local_state(&self, key: &str, value: Option<LoroValue>) {
568        let is_delete = value.is_none();
569        let mut states = self.states.lock().unwrap();
570        let old = states.insert(
571            key.to_string(),
572            State {
573                state: value,
574                timestamp: get_sys_timestamp() as Timestamp,
575            },
576        );
577
578        drop(states);
579        if !self.local_subs.inner().is_empty() {
580            self.local_subs.emit(&(), self.encode(key));
581        }
582        if !self.subscribers.inner().is_empty() {
583            if old.is_some() {
584                self.subscribers.emit(
585                    &(),
586                    EphemeralStoreEvent {
587                        by: EphemeralEventTrigger::Local,
588                        added: Arc::new(Vec::new()),
589                        updated: if !is_delete {
590                            Arc::new(vec![key.to_string()])
591                        } else {
592                            Arc::new(Vec::new())
593                        },
594                        removed: if !is_delete {
595                            Arc::new(Vec::new())
596                        } else {
597                            Arc::new(vec![key.to_string()])
598                        },
599                    },
600                );
601            } else if !is_delete {
602                self.subscribers.emit(
603                    &(),
604                    EphemeralStoreEvent {
605                        by: EphemeralEventTrigger::Local,
606                        added: Arc::new(vec![key.to_string()]),
607                        updated: Arc::new(Vec::new()),
608                        removed: Arc::new(Vec::new()),
609                    },
610                );
611            }
612        }
613    }
614}