1use std::sync::atomic::AtomicI64;
13use std::sync::{Arc, Mutex};
14
15use loro_common::{LoroValue, PeerID};
16use rustc_hash::FxHashMap;
17use serde::{Deserialize, Serialize};
18
19use crate::change::{get_sys_timestamp, Timestamp};
20use crate::{SubscriberSetWithQueue, Subscription};
21
22#[derive(Debug, Clone)]
29#[deprecated(since = "1.4.6", note = "Use `EphemeralStore` instead.")]
30pub struct Awareness {
31 peer: PeerID,
32 peers: FxHashMap<PeerID, PeerInfo>,
33 timeout: i64,
34}
35
36#[derive(Debug, Clone)]
37pub struct PeerInfo {
38 pub state: LoroValue,
39 pub counter: i32,
40 pub timestamp: i64,
42}
43
44#[derive(Serialize, Deserialize)]
45struct EncodedPeerInfo {
46 peer: PeerID,
47 counter: i32,
48 record: LoroValue,
49}
50
51#[allow(deprecated)]
52impl Awareness {
53 pub fn new(peer: PeerID, timeout: i64) -> Awareness {
54 Awareness {
55 peer,
56 timeout,
57 peers: FxHashMap::default(),
58 }
59 }
60
61 pub fn encode(&self, peers: &[PeerID]) -> Vec<u8> {
62 let mut peers_info = Vec::new();
63 let now = get_sys_timestamp() as Timestamp;
64 for peer in peers {
65 if let Some(peer_info) = self.peers.get(peer) {
66 if now - peer_info.timestamp > self.timeout {
67 continue;
68 }
69
70 let encoded_peer_info = EncodedPeerInfo {
71 peer: *peer,
72 record: peer_info.state.clone(),
73 counter: peer_info.counter,
74 };
75 peers_info.push(encoded_peer_info);
76 }
77 }
78
79 postcard::to_allocvec(&peers_info).unwrap()
80 }
81
82 pub fn encode_all(&self) -> Vec<u8> {
83 let mut peers_info = Vec::new();
84 let now = get_sys_timestamp() as Timestamp;
85 for (peer, peer_info) in self.peers.iter() {
86 if now - peer_info.timestamp > self.timeout {
87 continue;
88 }
89
90 let encoded_peer_info = EncodedPeerInfo {
91 peer: *peer,
92 record: peer_info.state.clone(),
93 counter: peer_info.counter,
94 };
95 peers_info.push(encoded_peer_info);
96 }
97
98 postcard::to_allocvec(&peers_info).unwrap()
99 }
100
101 pub fn apply(&mut self, encoded_peers_info: &[u8]) -> (Vec<PeerID>, Vec<PeerID>) {
103 let peers_info: Vec<EncodedPeerInfo> = postcard::from_bytes(encoded_peers_info).unwrap();
104 let mut changed_peers = Vec::new();
105 let mut added_peers = Vec::new();
106 let now = get_sys_timestamp() as Timestamp;
107 for peer_info in peers_info {
108 match self.peers.get(&peer_info.peer) {
109 Some(x) if x.counter >= peer_info.counter || peer_info.peer == self.peer => {
110 }
112 _ => {
113 let old = self.peers.insert(
114 peer_info.peer,
115 PeerInfo {
116 counter: peer_info.counter,
117 state: peer_info.record,
118 timestamp: now,
119 },
120 );
121 if old.is_some() {
122 changed_peers.push(peer_info.peer);
123 } else {
124 added_peers.push(peer_info.peer);
125 }
126 }
127 }
128 }
129
130 (changed_peers, added_peers)
131 }
132
133 pub fn set_local_state(&mut self, value: impl Into<LoroValue>) {
134 self._set_local_state(value.into());
135 }
136
137 fn _set_local_state(&mut self, value: LoroValue) {
138 let peer = self.peers.entry(self.peer).or_insert_with(|| PeerInfo {
139 state: Default::default(),
140 counter: 0,
141 timestamp: 0,
142 });
143
144 peer.state = value;
145 peer.counter += 1;
146 peer.timestamp = get_sys_timestamp() as Timestamp;
147 }
148
149 pub fn get_local_state(&self) -> Option<LoroValue> {
150 self.peers.get(&self.peer).map(|x| x.state.clone())
151 }
152
153 pub fn remove_outdated(&mut self) -> Vec<PeerID> {
154 let now = get_sys_timestamp() as Timestamp;
155 let mut removed = Vec::new();
156 self.peers.retain(|id, v| {
157 if now - v.timestamp > self.timeout {
158 removed.push(*id);
159 false
160 } else {
161 true
162 }
163 });
164
165 removed
166 }
167
168 pub fn get_all_states(&self) -> &FxHashMap<PeerID, PeerInfo> {
169 &self.peers
170 }
171
172 pub fn peer(&self) -> PeerID {
173 self.peer
174 }
175}
176
177#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178pub enum EphemeralEventTrigger {
179 Local,
180 Import,
181 Timeout,
182}
183
184#[derive(Debug, Clone)]
185pub struct EphemeralStoreEvent {
186 pub by: EphemeralEventTrigger,
187 pub added: Arc<Vec<String>>,
188 pub updated: Arc<Vec<String>>,
189 pub removed: Arc<Vec<String>>,
190}
191
192pub type LocalEphemeralCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
193pub type EphemeralSubscriber = Box<dyn Fn(&EphemeralStoreEvent) -> bool + Send + Sync + 'static>;
194
195#[derive(Debug, Clone)]
228pub struct EphemeralStore {
229 inner: Arc<EphemeralStoreInner>,
230}
231
232impl EphemeralStore {
233 pub fn new(timeout: i64) -> Self {
241 Self {
242 inner: Arc::new(EphemeralStoreInner::new(timeout)),
243 }
244 }
245
246 pub fn encode(&self, key: &str) -> Vec<u8> {
250 self.inner.encode(key)
251 }
252
253 pub fn encode_all(&self) -> Vec<u8> {
257 self.inner.encode_all()
258 }
259
260 pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
265 self.inner.apply(data)
266 }
267
268 pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
269 self.inner.set(key, value)
270 }
271
272 pub fn delete(&self, key: &str) {
273 self.inner.delete(key)
274 }
275
276 pub fn get(&self, key: &str) -> Option<LoroValue> {
277 self.inner.get(key)
278 }
279
280 pub fn remove_outdated(&self) {
286 self.inner.remove_outdated()
287 }
288
289 pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
290 self.inner.get_all_states()
291 }
292
293 pub fn keys(&self) -> Vec<String> {
294 self.inner.keys()
295 }
296
297 pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
332 self.inner.subscribe_local_updates(callback)
333 }
334
335 pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
336 self.inner.subscribe(callback)
337 }
338}
339
340struct EphemeralStoreInner {
341 states: Mutex<FxHashMap<String, State>>,
342 local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec<u8>>,
343 subscribers: SubscriberSetWithQueue<(), EphemeralSubscriber, EphemeralStoreEvent>,
344 timeout: AtomicI64,
345}
346
347impl std::fmt::Debug for EphemeralStoreInner {
348 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349 write!(
350 f,
351 "AwarenessV2 {{ states: {:?}, timeout: {:?} }}",
352 self.states, self.timeout
353 )
354 }
355}
356
357#[derive(Serialize, Deserialize)]
358struct EncodedState<'a> {
359 #[serde(borrow)]
360 key: &'a str,
361 value: Option<LoroValue>,
362 timestamp: i64,
363}
364
365#[derive(Debug, Clone)]
366struct State {
367 state: Option<LoroValue>,
368 timestamp: i64,
369}
370
371impl EphemeralStoreInner {
372 pub fn new(timeout: i64) -> EphemeralStoreInner {
373 EphemeralStoreInner {
374 timeout: AtomicI64::new(timeout),
375 states: Mutex::new(FxHashMap::default()),
376 local_subs: SubscriberSetWithQueue::new(),
377 subscribers: SubscriberSetWithQueue::new(),
378 }
379 }
380
381 pub fn encode(&self, key: &str) -> Vec<u8> {
382 let mut peers_info = Vec::new();
383 let now = get_sys_timestamp() as Timestamp;
384 let states = self.states.lock().unwrap();
385 if let Some(peer_state) = states.get(key) {
386 if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
387 {
388 return vec![];
389 }
390 let encoded_peer_info = EncodedState {
391 key,
392 value: peer_state.state.clone(),
393 timestamp: peer_state.timestamp,
394 };
395 peers_info.push(encoded_peer_info);
396 }
397
398 postcard::to_allocvec(&peers_info).unwrap()
399 }
400
401 pub fn encode_all(&self) -> Vec<u8> {
402 let mut peers_info = Vec::new();
403 let now = get_sys_timestamp() as Timestamp;
404 let states = self.states.lock().unwrap();
405 for (key, peer_state) in states.iter() {
406 if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
407 {
408 continue;
409 }
410 let encoded_peer_info = EncodedState {
411 key,
412 value: peer_state.state.clone(),
413 timestamp: peer_state.timestamp,
414 };
415 peers_info.push(encoded_peer_info);
416 }
417 postcard::to_allocvec(&peers_info).unwrap()
418 }
419
420 pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
421 let peers_info = match postcard::from_bytes::<Vec<EncodedState>>(data) {
422 Ok(ans) => ans,
423 Err(err) => return Err(format!("Failed to decode data: {}", err).into()),
424 };
425
426 let mut updated_keys = Vec::new();
427 let mut added_keys = Vec::new();
428 let mut removed_keys = Vec::new();
429 let now = get_sys_timestamp() as Timestamp;
430 let timeout = self.timeout.load(std::sync::atomic::Ordering::Relaxed);
431 let mut states = self.states.lock().unwrap();
432 for EncodedState {
433 key,
434 value: record,
435 timestamp,
436 } in peers_info
437 {
438 if now - timestamp > timeout {
439 continue;
440 }
441
442 match states.get_mut(key) {
443 Some(peer_info) if peer_info.timestamp >= timestamp => {
444 }
446 _ => {
447 let old = states.insert(
448 key.to_string(),
449 State {
450 state: record.clone(),
451 timestamp,
452 },
453 );
454 match (old, record) {
455 (Some(_), Some(_)) => updated_keys.push(key.to_string()),
456 (None, Some(_)) => added_keys.push(key.to_string()),
457 (Some(_), None) => removed_keys.push(key.to_string()),
458 (None, None) => {}
459 }
460 }
461 }
462 }
463
464 drop(states);
465 if !self.subscribers.inner().is_empty() {
466 self.subscribers.emit(
467 &(),
468 EphemeralStoreEvent {
469 by: EphemeralEventTrigger::Import,
470 added: Arc::new(added_keys),
471 updated: Arc::new(updated_keys),
472 removed: Arc::new(removed_keys),
473 },
474 );
475 }
476
477 Ok(())
478 }
479
480 pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
481 self._set_local_state(key, Some(value.into()));
482 }
483
484 pub fn delete(&self, key: &str) {
485 self._set_local_state(key, None);
486 }
487
488 pub fn get(&self, key: &str) -> Option<LoroValue> {
489 let states = self.states.lock().unwrap();
490 states.get(key).and_then(|x| x.state.clone())
491 }
492
493 pub fn remove_outdated(&self) {
494 let now = get_sys_timestamp() as Timestamp;
495 let mut removed = Vec::new();
496 let mut states = self.states.lock().unwrap();
497 states.retain(|key, state| {
498 if now - state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed) {
499 if state.state.is_some() {
500 removed.push(key.clone());
501 }
502 false
503 } else {
504 true
505 }
506 });
507 drop(states);
508 if !self.subscribers.inner().is_empty() {
509 self.subscribers.emit(
510 &(),
511 EphemeralStoreEvent {
512 by: EphemeralEventTrigger::Timeout,
513 added: Arc::new(Vec::new()),
514 updated: Arc::new(Vec::new()),
515 removed: Arc::new(removed),
516 },
517 );
518 }
519 }
520
521 pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
522 let states = self.states.lock().unwrap();
523 states
524 .iter()
525 .filter(|(_, v)| v.state.is_some())
526 .map(|(k, v)| (k.clone(), v.state.clone().unwrap()))
527 .collect()
528 }
529
530 pub fn keys(&self) -> Vec<String> {
531 let states = self.states.lock().unwrap();
532 states
533 .keys()
534 .filter(|&k| states.get(k).unwrap().state.is_some())
535 .map(|s| s.to_string())
536 .collect()
537 }
538
539 pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
556 let (sub, activate) = self.local_subs.inner().insert((), callback);
557 activate();
558 sub
559 }
560
561 pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
562 let (sub, activate) = self.subscribers.inner().insert((), callback);
563 activate();
564 sub
565 }
566
567 fn _set_local_state(&self, key: &str, value: Option<LoroValue>) {
568 let is_delete = value.is_none();
569 let mut states = self.states.lock().unwrap();
570 let old = states.insert(
571 key.to_string(),
572 State {
573 state: value,
574 timestamp: get_sys_timestamp() as Timestamp,
575 },
576 );
577
578 drop(states);
579 if !self.local_subs.inner().is_empty() {
580 self.local_subs.emit(&(), self.encode(key));
581 }
582 if !self.subscribers.inner().is_empty() {
583 if old.is_some() {
584 self.subscribers.emit(
585 &(),
586 EphemeralStoreEvent {
587 by: EphemeralEventTrigger::Local,
588 added: Arc::new(Vec::new()),
589 updated: if !is_delete {
590 Arc::new(vec![key.to_string()])
591 } else {
592 Arc::new(Vec::new())
593 },
594 removed: if !is_delete {
595 Arc::new(Vec::new())
596 } else {
597 Arc::new(vec![key.to_string()])
598 },
599 },
600 );
601 } else if !is_delete {
602 self.subscribers.emit(
603 &(),
604 EphemeralStoreEvent {
605 by: EphemeralEventTrigger::Local,
606 added: Arc::new(vec![key.to_string()]),
607 updated: Arc::new(Vec::new()),
608 removed: Arc::new(Vec::new()),
609 },
610 );
611 }
612 }
613 }
614}