1use std::sync::atomic::AtomicI64;
2use std::sync::{Arc, Mutex};
3
4use rustc_hash::FxHashMap;
5use loro_common::{LoroValue, PeerID};
6use serde::{Deserialize, Serialize};
7
8use crate::change::{get_sys_timestamp, Timestamp};
9use crate::{SubscriberSetWithQueue, Subscription};
10
11#[derive(Debug, Clone)]
18#[deprecated(since = "1.4.6", note = "Use `EphemeralStore` instead.")]
19pub struct Awareness {
20 peer: PeerID,
21 peers: FxHashMap<PeerID, PeerInfo>,
22 timeout: i64,
23}
24
25#[derive(Debug, Clone)]
26pub struct PeerInfo {
27 pub state: LoroValue,
28 pub counter: i32,
29 pub timestamp: i64,
31}
32
33#[derive(Serialize, Deserialize)]
34struct EncodedPeerInfo {
35 peer: PeerID,
36 counter: i32,
37 record: LoroValue,
38}
39
40#[allow(deprecated)]
41impl Awareness {
42 pub fn new(peer: PeerID, timeout: i64) -> Awareness {
43 Awareness {
44 peer,
45 timeout,
46 peers: FxHashMap::default(),
47 }
48 }
49
50 pub fn encode(&self, peers: &[PeerID]) -> Vec<u8> {
51 let mut peers_info = Vec::new();
52 let now = get_sys_timestamp() as Timestamp;
53 for peer in peers {
54 if let Some(peer_info) = self.peers.get(peer) {
55 if now - peer_info.timestamp > self.timeout {
56 continue;
57 }
58
59 let encoded_peer_info = EncodedPeerInfo {
60 peer: *peer,
61 record: peer_info.state.clone(),
62 counter: peer_info.counter,
63 };
64 peers_info.push(encoded_peer_info);
65 }
66 }
67
68 postcard::to_allocvec(&peers_info).unwrap()
69 }
70
71 pub fn encode_all(&self) -> Vec<u8> {
72 let mut peers_info = Vec::new();
73 let now = get_sys_timestamp() as Timestamp;
74 for (peer, peer_info) in self.peers.iter() {
75 if now - peer_info.timestamp > self.timeout {
76 continue;
77 }
78
79 let encoded_peer_info = EncodedPeerInfo {
80 peer: *peer,
81 record: peer_info.state.clone(),
82 counter: peer_info.counter,
83 };
84 peers_info.push(encoded_peer_info);
85 }
86
87 postcard::to_allocvec(&peers_info).unwrap()
88 }
89
90 pub fn apply(&mut self, encoded_peers_info: &[u8]) -> (Vec<PeerID>, Vec<PeerID>) {
92 let peers_info: Vec<EncodedPeerInfo> = postcard::from_bytes(encoded_peers_info).unwrap();
93 let mut changed_peers = Vec::new();
94 let mut added_peers = Vec::new();
95 let now = get_sys_timestamp() as Timestamp;
96 for peer_info in peers_info {
97 match self.peers.get(&peer_info.peer) {
98 Some(x) if x.counter >= peer_info.counter || peer_info.peer == self.peer => {
99 }
101 _ => {
102 let old = self.peers.insert(
103 peer_info.peer,
104 PeerInfo {
105 counter: peer_info.counter,
106 state: peer_info.record,
107 timestamp: now,
108 },
109 );
110 if old.is_some() {
111 changed_peers.push(peer_info.peer);
112 } else {
113 added_peers.push(peer_info.peer);
114 }
115 }
116 }
117 }
118
119 (changed_peers, added_peers)
120 }
121
122 pub fn set_local_state(&mut self, value: impl Into<LoroValue>) {
123 self._set_local_state(value.into());
124 }
125
126 fn _set_local_state(&mut self, value: LoroValue) {
127 let peer = self.peers.entry(self.peer).or_insert_with(|| PeerInfo {
128 state: Default::default(),
129 counter: 0,
130 timestamp: 0,
131 });
132
133 peer.state = value;
134 peer.counter += 1;
135 peer.timestamp = get_sys_timestamp() as Timestamp;
136 }
137
138 pub fn get_local_state(&self) -> Option<LoroValue> {
139 self.peers.get(&self.peer).map(|x| x.state.clone())
140 }
141
142 pub fn remove_outdated(&mut self) -> Vec<PeerID> {
143 let now = get_sys_timestamp() as Timestamp;
144 let mut removed = Vec::new();
145 self.peers.retain(|id, v| {
146 if now - v.timestamp > self.timeout {
147 removed.push(*id);
148 false
149 } else {
150 true
151 }
152 });
153
154 removed
155 }
156
157 pub fn get_all_states(&self) -> &FxHashMap<PeerID, PeerInfo> {
158 &self.peers
159 }
160
161 pub fn peer(&self) -> PeerID {
162 self.peer
163 }
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum EphemeralEventTrigger {
168 Local,
169 Import,
170 Timeout,
171}
172
173#[derive(Debug, Clone)]
174pub struct EphemeralStoreEvent {
175 pub by: EphemeralEventTrigger,
176 pub added: Arc<Vec<String>>,
177 pub updated: Arc<Vec<String>>,
178 pub removed: Arc<Vec<String>>,
179}
180
181pub type LocalEphemeralCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
182pub type EphemeralSubscriber = Box<dyn Fn(&EphemeralStoreEvent) -> bool + Send + Sync + 'static>;
183
184#[derive(Debug, Clone)]
217pub struct EphemeralStore {
218 inner: Arc<EphemeralStoreInner>,
219}
220
221impl EphemeralStore {
222 pub fn new(timeout: i64) -> Self {
230 Self {
231 inner: Arc::new(EphemeralStoreInner::new(timeout)),
232 }
233 }
234
235 pub fn encode(&self, key: &str) -> Vec<u8> {
239 self.inner.encode(key)
240 }
241
242 pub fn encode_all(&self) -> Vec<u8> {
246 self.inner.encode_all()
247 }
248
249 pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
254 self.inner.apply(data)
255 }
256
257 pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
258 self.inner.set(key, value)
259 }
260
261 pub fn delete(&self, key: &str) {
262 self.inner.delete(key)
263 }
264
265 pub fn get(&self, key: &str) -> Option<LoroValue> {
266 self.inner.get(key)
267 }
268
269 pub fn remove_outdated(&self) {
275 self.inner.remove_outdated()
276 }
277
278 pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
279 self.inner.get_all_states()
280 }
281
282 pub fn keys(&self) -> Vec<String> {
283 self.inner.keys()
284 }
285
286 pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
321 self.inner.subscribe_local_updates(callback)
322 }
323
324 pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
325 self.inner.subscribe(callback)
326 }
327}
328
329struct EphemeralStoreInner {
330 states: Mutex<FxHashMap<String, State>>,
331 local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec<u8>>,
332 subscribers: SubscriberSetWithQueue<(), EphemeralSubscriber, EphemeralStoreEvent>,
333 timeout: AtomicI64,
334}
335
336impl std::fmt::Debug for EphemeralStoreInner {
337 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338 write!(
339 f,
340 "AwarenessV2 {{ states: {:?}, timeout: {:?} }}",
341 self.states, self.timeout
342 )
343 }
344}
345
346#[derive(Serialize, Deserialize)]
347struct EncodedState<'a> {
348 #[serde(borrow)]
349 key: &'a str,
350 value: Option<LoroValue>,
351 timestamp: i64,
352}
353
354#[derive(Debug, Clone)]
355struct State {
356 state: Option<LoroValue>,
357 timestamp: i64,
358}
359
360impl EphemeralStoreInner {
361 pub fn new(timeout: i64) -> EphemeralStoreInner {
362 EphemeralStoreInner {
363 timeout: AtomicI64::new(timeout),
364 states: Mutex::new(FxHashMap::default()),
365 local_subs: SubscriberSetWithQueue::new(),
366 subscribers: SubscriberSetWithQueue::new(),
367 }
368 }
369
370 pub fn encode(&self, key: &str) -> Vec<u8> {
371 let mut peers_info = Vec::new();
372 let now = get_sys_timestamp() as Timestamp;
373 let states = self.states.lock().unwrap();
374 if let Some(peer_state) = states.get(key) {
375 if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
376 {
377 return vec![];
378 }
379 let encoded_peer_info = EncodedState {
380 key,
381 value: peer_state.state.clone(),
382 timestamp: peer_state.timestamp,
383 };
384 peers_info.push(encoded_peer_info);
385 }
386
387 postcard::to_allocvec(&peers_info).unwrap()
388 }
389
390 pub fn encode_all(&self) -> Vec<u8> {
391 let mut peers_info = Vec::new();
392 let now = get_sys_timestamp() as Timestamp;
393 let states = self.states.lock().unwrap();
394 for (key, peer_state) in states.iter() {
395 if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
396 {
397 continue;
398 }
399 let encoded_peer_info = EncodedState {
400 key,
401 value: peer_state.state.clone(),
402 timestamp: peer_state.timestamp,
403 };
404 peers_info.push(encoded_peer_info);
405 }
406 postcard::to_allocvec(&peers_info).unwrap()
407 }
408
409 pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
410 let peers_info = match postcard::from_bytes::<Vec<EncodedState>>(data) {
411 Ok(ans) => ans,
412 Err(err) => return Err(format!("Failed to decode data: {}", err).into()),
413 };
414
415 let mut updated_keys = Vec::new();
416 let mut added_keys = Vec::new();
417 let mut removed_keys = Vec::new();
418 let now = get_sys_timestamp() as Timestamp;
419 let mut states = self.states.lock().unwrap();
420 for EncodedState {
421 key,
422 value: record,
423 timestamp,
424 } in peers_info
425 {
426 match states.get_mut(key) {
427 Some(peer_info) if peer_info.timestamp >= timestamp => {
428 }
430 _ => {
431 let old = states.insert(
432 key.to_string(),
433 State {
434 state: record.clone(),
435 timestamp: now,
436 },
437 );
438 match (old, record) {
439 (Some(_), Some(_)) => updated_keys.push(key.to_string()),
440 (None, Some(_)) => added_keys.push(key.to_string()),
441 (Some(_), None) => removed_keys.push(key.to_string()),
442 (None, None) => {}
443 }
444 }
445 }
446 }
447
448 drop(states);
449 if !self.subscribers.inner().is_empty() {
450 self.subscribers.emit(
451 &(),
452 EphemeralStoreEvent {
453 by: EphemeralEventTrigger::Import,
454 added: Arc::new(added_keys),
455 updated: Arc::new(updated_keys),
456 removed: Arc::new(removed_keys),
457 },
458 );
459 }
460
461 Ok(())
462 }
463
464 pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
465 self._set_local_state(key, Some(value.into()));
466 }
467
468 pub fn delete(&self, key: &str) {
469 self._set_local_state(key, None);
470 }
471
472 pub fn get(&self, key: &str) -> Option<LoroValue> {
473 let states = self.states.lock().unwrap();
474 states.get(key).and_then(|x| x.state.clone())
475 }
476
477 pub fn remove_outdated(&self) {
478 let now = get_sys_timestamp() as Timestamp;
479 let mut removed = Vec::new();
480 let mut states = self.states.lock().unwrap();
481 states.retain(|key, state| {
482 if now - state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed) {
483 if state.state.is_some() {
484 removed.push(key.clone());
485 }
486 false
487 } else {
488 true
489 }
490 });
491 drop(states);
492 if !self.subscribers.inner().is_empty() {
493 self.subscribers.emit(
494 &(),
495 EphemeralStoreEvent {
496 by: EphemeralEventTrigger::Timeout,
497 added: Arc::new(Vec::new()),
498 updated: Arc::new(Vec::new()),
499 removed: Arc::new(removed),
500 },
501 );
502 }
503 }
504
505 pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
506 let states = self.states.lock().unwrap();
507 states
508 .iter()
509 .filter(|(_, v)| v.state.is_some())
510 .map(|(k, v)| (k.clone(), v.state.clone().unwrap()))
511 .collect()
512 }
513
514 pub fn keys(&self) -> Vec<String> {
515 let states = self.states.lock().unwrap();
516 states
517 .keys()
518 .filter(|&k| states.get(k).unwrap().state.is_some())
519 .map(|s| s.to_string())
520 .collect()
521 }
522
523 pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
540 let (sub, activate) = self.local_subs.inner().insert((), callback);
541 activate();
542 sub
543 }
544
545 pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
546 let (sub, activate) = self.subscribers.inner().insert((), callback);
547 activate();
548 sub
549 }
550
551 fn _set_local_state(&self, key: &str, value: Option<LoroValue>) {
552 let is_delete = value.is_none();
553 let mut states = self.states.lock().unwrap();
554 let old = states.insert(
555 key.to_string(),
556 State {
557 state: value,
558 timestamp: get_sys_timestamp() as Timestamp,
559 },
560 );
561
562 drop(states);
563 if !self.local_subs.inner().is_empty() {
564 self.local_subs.emit(&(), self.encode(key));
565 }
566 if !self.subscribers.inner().is_empty() {
567 if old.is_some() {
568 self.subscribers.emit(
569 &(),
570 EphemeralStoreEvent {
571 by: EphemeralEventTrigger::Local,
572 added: Arc::new(Vec::new()),
573 updated: if !is_delete {
574 Arc::new(vec![key.to_string()])
575 } else {
576 Arc::new(Vec::new())
577 },
578 removed: if !is_delete {
579 Arc::new(Vec::new())
580 } else {
581 Arc::new(vec![key.to_string()])
582 },
583 },
584 );
585 } else if !is_delete {
586 self.subscribers.emit(
587 &(),
588 EphemeralStoreEvent {
589 by: EphemeralEventTrigger::Local,
590 added: Arc::new(vec![key.to_string()]),
591 updated: Arc::new(Vec::new()),
592 removed: Arc::new(Vec::new()),
593 },
594 );
595 }
596 }
597 }
598}