1use crate::sync::Mutex;
13use std::sync::atomic::AtomicI64;
14use std::sync::Arc;
15
16use loro_common::{LoroValue, PeerID};
17use rustc_hash::FxHashMap;
18use serde::de::{DeserializeSeed, EnumAccess, MapAccess, SeqAccess, VariantAccess};
19use serde::{Deserialize, Deserializer, Serialize};
20
21use crate::change::{get_sys_timestamp, Timestamp};
22use crate::{SubscriberSetWithQueue, Subscription};
23
24#[derive(Debug, Clone)]
31#[deprecated(since = "1.4.6", note = "Use `EphemeralStore` instead.")]
32pub struct Awareness {
33 peer: PeerID,
34 peers: FxHashMap<PeerID, PeerInfo>,
35 timeout: i64,
36}
37
38#[derive(Debug, Clone)]
39pub struct PeerInfo {
40 pub state: LoroValue,
41 pub counter: i32,
42 pub timestamp: i64,
44}
45
46#[derive(Serialize, Deserialize)]
47struct EncodedPeerInfo {
48 peer: PeerID,
49 counter: i32,
50 record: LoroValue,
51}
52
53#[derive(Deserialize)]
54struct DecodedPeerInfo {
55 peer: PeerID,
56 counter: i32,
57 #[serde(deserialize_with = "deserialize_depth_limited_loro_value")]
58 record: LoroValue,
59}
60
61#[allow(deprecated)]
62impl Awareness {
63 pub fn new(peer: PeerID, timeout: i64) -> Awareness {
64 Awareness {
65 peer,
66 timeout,
67 peers: FxHashMap::default(),
68 }
69 }
70
71 pub fn encode(&self, peers: &[PeerID]) -> Vec<u8> {
72 let mut peers_info = Vec::new();
73 let now = get_sys_timestamp() as Timestamp;
74 for peer in peers {
75 if let Some(peer_info) = self.peers.get(peer) {
76 if now - peer_info.timestamp > self.timeout {
77 continue;
78 }
79
80 let encoded_peer_info = EncodedPeerInfo {
81 peer: *peer,
82 record: peer_info.state.clone(),
83 counter: peer_info.counter,
84 };
85 peers_info.push(encoded_peer_info);
86 }
87 }
88
89 postcard::to_allocvec(&peers_info).unwrap()
90 }
91
92 pub fn encode_all(&self) -> Vec<u8> {
93 let mut peers_info = Vec::new();
94 let now = get_sys_timestamp() as Timestamp;
95 for (peer, peer_info) in self.peers.iter() {
96 if now - peer_info.timestamp > self.timeout {
97 continue;
98 }
99
100 let encoded_peer_info = EncodedPeerInfo {
101 peer: *peer,
102 record: peer_info.state.clone(),
103 counter: peer_info.counter,
104 };
105 peers_info.push(encoded_peer_info);
106 }
107
108 postcard::to_allocvec(&peers_info).unwrap()
109 }
110
111 pub fn apply(&mut self, encoded_peers_info: &[u8]) -> (Vec<PeerID>, Vec<PeerID>) {
113 let peers_info: Vec<DecodedPeerInfo> =
114 postcard::from_bytes(encoded_peers_info).expect("Failed to decode awareness data");
115 let mut changed_peers = Vec::new();
116 let mut added_peers = Vec::new();
117 let now = get_sys_timestamp() as Timestamp;
118 for peer_info in peers_info {
119 match self.peers.get(&peer_info.peer) {
120 Some(x) if x.counter >= peer_info.counter || peer_info.peer == self.peer => {
121 }
123 _ => {
124 let old = self.peers.insert(
125 peer_info.peer,
126 PeerInfo {
127 counter: peer_info.counter,
128 state: peer_info.record,
129 timestamp: now,
130 },
131 );
132 if old.is_some() {
133 changed_peers.push(peer_info.peer);
134 } else {
135 added_peers.push(peer_info.peer);
136 }
137 }
138 }
139 }
140
141 (changed_peers, added_peers)
142 }
143
144 pub fn set_local_state(&mut self, value: impl Into<LoroValue>) {
145 self._set_local_state(value.into());
146 }
147
148 fn _set_local_state(&mut self, value: LoroValue) {
149 let peer = self.peers.entry(self.peer).or_insert_with(|| PeerInfo {
150 state: Default::default(),
151 counter: 0,
152 timestamp: 0,
153 });
154
155 peer.state = value;
156 peer.counter += 1;
157 peer.timestamp = get_sys_timestamp() as Timestamp;
158 }
159
160 pub fn get_local_state(&self) -> Option<LoroValue> {
161 self.peers.get(&self.peer).map(|x| x.state.clone())
162 }
163
164 pub fn remove_outdated(&mut self) -> Vec<PeerID> {
165 let now = get_sys_timestamp() as Timestamp;
166 let mut removed = Vec::new();
167 self.peers.retain(|id, v| {
168 if now - v.timestamp > self.timeout {
169 removed.push(*id);
170 false
171 } else {
172 true
173 }
174 });
175
176 removed
177 }
178
179 pub fn get_all_states(&self) -> &FxHashMap<PeerID, PeerInfo> {
180 &self.peers
181 }
182
183 pub fn peer(&self) -> PeerID {
184 self.peer
185 }
186}
187
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum EphemeralEventTrigger {
190 Local,
191 Import,
192 Timeout,
193}
194
195#[derive(Debug, Clone)]
196pub struct EphemeralStoreEvent {
197 pub by: EphemeralEventTrigger,
198 pub added: Arc<Vec<String>>,
199 pub updated: Arc<Vec<String>>,
200 pub removed: Arc<Vec<String>>,
201}
202
203pub type LocalEphemeralCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
204pub type EphemeralSubscriber = Box<dyn Fn(&EphemeralStoreEvent) -> bool + Send + Sync + 'static>;
205
206#[derive(Debug, Clone)]
239pub struct EphemeralStore {
240 inner: Arc<EphemeralStoreInner>,
241}
242
243impl EphemeralStore {
244 pub fn new(timeout: i64) -> Self {
252 Self {
253 inner: Arc::new(EphemeralStoreInner::new(timeout)),
254 }
255 }
256
257 pub fn encode(&self, key: &str) -> Vec<u8> {
261 self.inner.encode(key)
262 }
263
264 pub fn encode_all(&self) -> Vec<u8> {
268 self.inner.encode_all()
269 }
270
271 pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
276 self.inner.apply(data)
277 }
278
279 pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
280 self.inner.set(key, value)
281 }
282
283 pub fn delete(&self, key: &str) {
284 self.inner.delete(key)
285 }
286
287 pub fn get(&self, key: &str) -> Option<LoroValue> {
288 self.inner.get(key)
289 }
290
291 pub fn remove_outdated(&self) {
297 self.inner.remove_outdated()
298 }
299
300 pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
301 self.inner.get_all_states()
302 }
303
304 pub fn keys(&self) -> Vec<String> {
305 self.inner.keys()
306 }
307
308 pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
343 self.inner.subscribe_local_updates(callback)
344 }
345
346 pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
347 self.inner.subscribe(callback)
348 }
349}
350
351struct EphemeralStoreInner {
352 states: Mutex<FxHashMap<String, State>>,
353 local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec<u8>>,
354 subscribers: SubscriberSetWithQueue<(), EphemeralSubscriber, EphemeralStoreEvent>,
355 timeout: AtomicI64,
356}
357
358impl std::fmt::Debug for EphemeralStoreInner {
359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360 write!(
361 f,
362 "AwarenessV2 {{ states: {:?}, timeout: {:?} }}",
363 self.states, self.timeout
364 )
365 }
366}
367
368#[derive(Serialize, Deserialize)]
369struct EncodedState<'a> {
370 #[serde(borrow)]
371 key: &'a str,
372 value: Option<LoroValue>,
373 timestamp: i64,
374}
375
376#[derive(Deserialize)]
377struct DecodedState<'a> {
378 #[serde(borrow)]
379 key: &'a str,
380 #[serde(deserialize_with = "deserialize_optional_depth_limited_loro_value")]
381 value: Option<LoroValue>,
382 timestamp: i64,
383}
384
385const EPHEMERAL_VALUE_MAX_DEPTH: usize = 512;
386
387fn deserialize_optional_depth_limited_loro_value<'de, D>(
388 deserializer: D,
389) -> Result<Option<LoroValue>, D::Error>
390where
391 D: Deserializer<'de>,
392{
393 Option::<DepthLimitedLoroValue>::deserialize(deserializer)
394 .map(|value| value.map(DepthLimitedLoroValue::into_inner))
395}
396
397fn deserialize_depth_limited_loro_value<'de, D>(deserializer: D) -> Result<LoroValue, D::Error>
398where
399 D: Deserializer<'de>,
400{
401 LoroValueSeed {
402 remaining_depth: EPHEMERAL_VALUE_MAX_DEPTH,
403 }
404 .deserialize(deserializer)
405}
406
407struct DepthLimitedLoroValue(LoroValue);
408
409impl DepthLimitedLoroValue {
410 fn into_inner(self) -> LoroValue {
411 self.0
412 }
413}
414
415impl<'de> Deserialize<'de> for DepthLimitedLoroValue {
416 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
417 where
418 D: Deserializer<'de>,
419 {
420 deserialize_depth_limited_loro_value(deserializer).map(Self)
421 }
422}
423
424#[derive(Clone, Copy)]
425struct LoroValueSeed {
426 remaining_depth: usize,
427}
428
429impl LoroValueSeed {
430 fn nested<E>(self) -> Result<Self, E>
431 where
432 E: serde::de::Error,
433 {
434 let remaining_depth = self
435 .remaining_depth
436 .checked_sub(1)
437 .ok_or_else(|| E::custom("LoroValue nesting depth exceeded"))?;
438 Ok(Self { remaining_depth })
439 }
440}
441
442impl<'de> DeserializeSeed<'de> for LoroValueSeed {
443 type Value = LoroValue;
444
445 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
446 where
447 D: Deserializer<'de>,
448 {
449 if deserializer.is_human_readable() {
450 return Err(serde::de::Error::custom(
451 "human-readable LoroValue is not supported by the awareness decoder",
452 ));
453 }
454
455 deserializer.deserialize_enum(
456 "LoroValue",
457 &[
458 "Null",
459 "Bool",
460 "Double",
461 "I32",
462 "String",
463 "List",
464 "Map",
465 "Container",
466 "Binary",
467 ],
468 LoroValueSeedVisitor { seed: self },
469 )
470 }
471}
472
473struct LoroValueSeedVisitor {
474 seed: LoroValueSeed,
475}
476
477impl<'de> serde::de::Visitor<'de> for LoroValueSeedVisitor {
478 type Value = LoroValue;
479
480 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
481 formatter.write_str("a depth-limited LoroValue")
482 }
483
484 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
485 where
486 A: EnumAccess<'de>,
487 {
488 match data.variant()? {
489 (DepthLimitedLoroValueField::Null, v) => {
490 v.unit_variant()?;
491 Ok(LoroValue::Null)
492 }
493 (DepthLimitedLoroValueField::Bool, v) => v.newtype_variant().map(LoroValue::Bool),
494 (DepthLimitedLoroValueField::Double, v) => v.newtype_variant().map(LoroValue::Double),
495 (DepthLimitedLoroValueField::I32, v) => v.newtype_variant().map(LoroValue::I64),
496 (DepthLimitedLoroValueField::String, v) => v
497 .newtype_variant()
498 .map(|value: String| LoroValue::String(value.into())),
499 (DepthLimitedLoroValueField::List, v) => v
500 .newtype_variant_seed(LoroListSeed {
501 value_seed: self.seed.nested()?,
502 })
503 .map(|value| LoroValue::List(value.into())),
504 (DepthLimitedLoroValueField::Map, v) => v
505 .newtype_variant_seed(LoroMapSeed {
506 value_seed: self.seed.nested()?,
507 })
508 .map(|value| LoroValue::Map(value.into())),
509 (DepthLimitedLoroValueField::Container, v) => {
510 v.newtype_variant().map(LoroValue::Container)
511 }
512 (DepthLimitedLoroValueField::Binary, v) => v
513 .newtype_variant()
514 .map(|value: Vec<u8>| LoroValue::Binary(value.into())),
515 }
516 }
517}
518
519#[derive(Deserialize)]
520enum DepthLimitedLoroValueField {
521 Null,
522 Bool,
523 Double,
524 I32,
525 String,
526 List,
527 Map,
528 Container,
529 Binary,
530}
531
532struct LoroListSeed {
533 value_seed: LoroValueSeed,
534}
535
536impl<'de> DeserializeSeed<'de> for LoroListSeed {
537 type Value = Vec<LoroValue>;
538
539 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
540 where
541 D: Deserializer<'de>,
542 {
543 deserializer.deserialize_seq(LoroListVisitor {
544 value_seed: self.value_seed,
545 })
546 }
547}
548
549struct LoroListVisitor {
550 value_seed: LoroValueSeed,
551}
552
553impl<'de> serde::de::Visitor<'de> for LoroListVisitor {
554 type Value = Vec<LoroValue>;
555
556 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
557 formatter.write_str("a depth-limited LoroValue list")
558 }
559
560 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
561 where
562 A: SeqAccess<'de>,
563 {
564 let mut list = Vec::new();
565 while let Some(value) = seq.next_element_seed(self.value_seed)? {
566 list.push(value);
567 }
568 Ok(list)
569 }
570}
571
572struct LoroMapSeed {
573 value_seed: LoroValueSeed,
574}
575
576impl<'de> DeserializeSeed<'de> for LoroMapSeed {
577 type Value = FxHashMap<String, LoroValue>;
578
579 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
580 where
581 D: Deserializer<'de>,
582 {
583 deserializer.deserialize_map(LoroMapVisitor {
584 value_seed: self.value_seed,
585 })
586 }
587}
588
589struct LoroMapVisitor {
590 value_seed: LoroValueSeed,
591}
592
593impl<'de> serde::de::Visitor<'de> for LoroMapVisitor {
594 type Value = FxHashMap<String, LoroValue>;
595
596 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
597 formatter.write_str("a depth-limited LoroValue map")
598 }
599
600 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
601 where
602 A: MapAccess<'de>,
603 {
604 let mut value = FxHashMap::default();
605 while let Some(key) = map.next_key()? {
606 let entry = map.next_value_seed(self.value_seed)?;
607 value.insert(key, entry);
608 }
609 Ok(value)
610 }
611}
612
613#[derive(Debug, Clone)]
614struct State {
615 state: Option<LoroValue>,
616 timestamp: i64,
617}
618
619impl EphemeralStoreInner {
620 pub fn new(timeout: i64) -> EphemeralStoreInner {
621 EphemeralStoreInner {
622 timeout: AtomicI64::new(timeout),
623 states: Mutex::new(FxHashMap::default()),
624 local_subs: SubscriberSetWithQueue::new(),
625 subscribers: SubscriberSetWithQueue::new(),
626 }
627 }
628
629 pub fn encode(&self, key: &str) -> Vec<u8> {
630 let mut peers_info = Vec::new();
631 let now = get_sys_timestamp() as Timestamp;
632 let states = self.states.lock();
633 if let Some(peer_state) = states.get(key) {
634 if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
635 {
636 return vec![];
637 }
638 let encoded_peer_info = EncodedState {
639 key,
640 value: peer_state.state.clone(),
641 timestamp: peer_state.timestamp,
642 };
643 peers_info.push(encoded_peer_info);
644 }
645
646 postcard::to_allocvec(&peers_info).unwrap()
647 }
648
649 pub fn encode_all(&self) -> Vec<u8> {
650 let mut peers_info = Vec::new();
651 let now = get_sys_timestamp() as Timestamp;
652 let states = self.states.lock();
653 for (key, peer_state) in states.iter() {
654 if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
655 {
656 continue;
657 }
658 let encoded_peer_info = EncodedState {
659 key,
660 value: peer_state.state.clone(),
661 timestamp: peer_state.timestamp,
662 };
663 peers_info.push(encoded_peer_info);
664 }
665 postcard::to_allocvec(&peers_info).unwrap()
666 }
667
668 pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
669 let peers_info = match postcard::from_bytes::<Vec<DecodedState>>(data) {
670 Ok(ans) => ans,
671 Err(err) => return Err(format!("Failed to decode data: {}", err).into()),
672 };
673
674 let mut updated_keys = Vec::new();
675 let mut added_keys = Vec::new();
676 let mut removed_keys = Vec::new();
677 let now = get_sys_timestamp() as Timestamp;
678 let timeout = self.timeout.load(std::sync::atomic::Ordering::Relaxed);
679 let mut states = self.states.lock();
680 for DecodedState {
681 key,
682 value: record,
683 timestamp,
684 } in peers_info
685 {
686 if now - timestamp > timeout {
687 continue;
688 }
689
690 match states.get_mut(key) {
691 Some(peer_info) if peer_info.timestamp >= timestamp => {
692 }
694 _ => {
695 let old = states.insert(
696 key.to_string(),
697 State {
698 state: record.clone(),
699 timestamp,
700 },
701 );
702 match (old, record) {
703 (Some(_), Some(_)) => updated_keys.push(key.to_string()),
704 (None, Some(_)) => added_keys.push(key.to_string()),
705 (Some(_), None) => removed_keys.push(key.to_string()),
706 (None, None) => {}
707 }
708 }
709 }
710 }
711
712 drop(states);
713 if !self.subscribers.inner().is_empty() {
714 self.subscribers.emit(
715 &(),
716 EphemeralStoreEvent {
717 by: EphemeralEventTrigger::Import,
718 added: Arc::new(added_keys),
719 updated: Arc::new(updated_keys),
720 removed: Arc::new(removed_keys),
721 },
722 );
723 }
724
725 Ok(())
726 }
727
728 pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
729 self._set_local_state(key, Some(value.into()));
730 }
731
732 pub fn delete(&self, key: &str) {
733 self._set_local_state(key, None);
734 }
735
736 pub fn get(&self, key: &str) -> Option<LoroValue> {
737 let states = self.states.lock();
738 states.get(key).and_then(|x| x.state.clone())
739 }
740
741 pub fn remove_outdated(&self) {
742 let now = get_sys_timestamp() as Timestamp;
743 let mut removed = Vec::new();
744 let mut states = self.states.lock();
745 states.retain(|key, state| {
746 if now - state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed) {
747 if state.state.is_some() {
748 removed.push(key.clone());
749 }
750 false
751 } else {
752 true
753 }
754 });
755 drop(states);
756 if !self.subscribers.inner().is_empty() {
757 self.subscribers.emit(
758 &(),
759 EphemeralStoreEvent {
760 by: EphemeralEventTrigger::Timeout,
761 added: Arc::new(Vec::new()),
762 updated: Arc::new(Vec::new()),
763 removed: Arc::new(removed),
764 },
765 );
766 }
767 }
768
769 pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
770 let states = self.states.lock();
771 states
772 .iter()
773 .filter(|(_, v)| v.state.is_some())
774 .map(|(k, v)| (k.clone(), v.state.clone().unwrap()))
775 .collect()
776 }
777
778 pub fn keys(&self) -> Vec<String> {
779 let states = self.states.lock();
780 states
781 .keys()
782 .filter(|&k| states.get(k).unwrap().state.is_some())
783 .map(|s| s.to_string())
784 .collect()
785 }
786
787 pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
804 let (sub, activate) = self.local_subs.inner().insert((), callback);
805 activate();
806 sub
807 }
808
809 pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
810 let (sub, activate) = self.subscribers.inner().insert((), callback);
811 activate();
812 sub
813 }
814
815 fn _set_local_state(&self, key: &str, value: Option<LoroValue>) {
816 let is_delete = value.is_none();
817 let mut states = self.states.lock();
818 let old = states.insert(
819 key.to_string(),
820 State {
821 state: value,
822 timestamp: get_sys_timestamp() as Timestamp,
823 },
824 );
825
826 drop(states);
827 if !self.local_subs.inner().is_empty() {
828 self.local_subs.emit(&(), self.encode(key));
829 }
830 if !self.subscribers.inner().is_empty() {
831 if old.is_some() {
832 self.subscribers.emit(
833 &(),
834 EphemeralStoreEvent {
835 by: EphemeralEventTrigger::Local,
836 added: Arc::new(Vec::new()),
837 updated: if !is_delete {
838 Arc::new(vec![key.to_string()])
839 } else {
840 Arc::new(Vec::new())
841 },
842 removed: if !is_delete {
843 Arc::new(Vec::new())
844 } else {
845 Arc::new(vec![key.to_string()])
846 },
847 },
848 );
849 } else if !is_delete {
850 self.subscribers.emit(
851 &(),
852 EphemeralStoreEvent {
853 by: EphemeralEventTrigger::Local,
854 added: Arc::new(vec![key.to_string()]),
855 updated: Arc::new(Vec::new()),
856 removed: Arc::new(Vec::new()),
857 },
858 );
859 }
860 }
861 }
862}