use crate::sync::Mutex;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use loro_common::{LoroValue, PeerID};
use rustc_hash::FxHashMap;
use serde::de::{DeserializeSeed, EnumAccess, MapAccess, SeqAccess, VariantAccess};
use serde::{Deserialize, Deserializer, Serialize};
use crate::change::{get_sys_timestamp, Timestamp};
use crate::{SubscriberSetWithQueue, Subscription};
#[derive(Debug, Clone)]
#[deprecated(since = "1.4.6", note = "Use `EphemeralStore` instead.")]
pub struct Awareness {
peer: PeerID,
peers: FxHashMap<PeerID, PeerInfo>,
timeout: i64,
}
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub state: LoroValue,
pub counter: i32,
pub timestamp: i64,
}
#[derive(Serialize, Deserialize)]
struct EncodedPeerInfo {
peer: PeerID,
counter: i32,
record: LoroValue,
}
#[derive(Deserialize)]
struct DecodedPeerInfo {
peer: PeerID,
counter: i32,
#[serde(deserialize_with = "deserialize_depth_limited_loro_value")]
record: LoroValue,
}
#[allow(deprecated)]
impl Awareness {
pub fn new(peer: PeerID, timeout: i64) -> Awareness {
Awareness {
peer,
timeout,
peers: FxHashMap::default(),
}
}
pub fn encode(&self, peers: &[PeerID]) -> Vec<u8> {
let mut peers_info = Vec::new();
let now = get_sys_timestamp() as Timestamp;
for peer in peers {
if let Some(peer_info) = self.peers.get(peer) {
if now - peer_info.timestamp > self.timeout {
continue;
}
let encoded_peer_info = EncodedPeerInfo {
peer: *peer,
record: peer_info.state.clone(),
counter: peer_info.counter,
};
peers_info.push(encoded_peer_info);
}
}
postcard::to_allocvec(&peers_info).unwrap()
}
pub fn encode_all(&self) -> Vec<u8> {
let mut peers_info = Vec::new();
let now = get_sys_timestamp() as Timestamp;
for (peer, peer_info) in self.peers.iter() {
if now - peer_info.timestamp > self.timeout {
continue;
}
let encoded_peer_info = EncodedPeerInfo {
peer: *peer,
record: peer_info.state.clone(),
counter: peer_info.counter,
};
peers_info.push(encoded_peer_info);
}
postcard::to_allocvec(&peers_info).unwrap()
}
pub fn apply(&mut self, encoded_peers_info: &[u8]) -> (Vec<PeerID>, Vec<PeerID>) {
let peers_info: Vec<DecodedPeerInfo> =
postcard::from_bytes(encoded_peers_info).expect("Failed to decode awareness data");
let mut changed_peers = Vec::new();
let mut added_peers = Vec::new();
let now = get_sys_timestamp() as Timestamp;
for peer_info in peers_info {
match self.peers.get(&peer_info.peer) {
Some(x) if x.counter >= peer_info.counter || peer_info.peer == self.peer => {
}
_ => {
let old = self.peers.insert(
peer_info.peer,
PeerInfo {
counter: peer_info.counter,
state: peer_info.record,
timestamp: now,
},
);
if old.is_some() {
changed_peers.push(peer_info.peer);
} else {
added_peers.push(peer_info.peer);
}
}
}
}
(changed_peers, added_peers)
}
pub fn set_local_state(&mut self, value: impl Into<LoroValue>) {
self._set_local_state(value.into());
}
fn _set_local_state(&mut self, value: LoroValue) {
let peer = self.peers.entry(self.peer).or_insert_with(|| PeerInfo {
state: Default::default(),
counter: 0,
timestamp: 0,
});
peer.state = value;
peer.counter += 1;
peer.timestamp = get_sys_timestamp() as Timestamp;
}
pub fn get_local_state(&self) -> Option<LoroValue> {
self.peers.get(&self.peer).map(|x| x.state.clone())
}
pub fn remove_outdated(&mut self) -> Vec<PeerID> {
let now = get_sys_timestamp() as Timestamp;
let mut removed = Vec::new();
self.peers.retain(|id, v| {
if now - v.timestamp > self.timeout {
removed.push(*id);
false
} else {
true
}
});
removed
}
pub fn get_all_states(&self) -> &FxHashMap<PeerID, PeerInfo> {
&self.peers
}
pub fn peer(&self) -> PeerID {
self.peer
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EphemeralEventTrigger {
Local,
Import,
Timeout,
}
#[derive(Debug, Clone)]
pub struct EphemeralStoreEvent {
pub by: EphemeralEventTrigger,
pub added: Arc<Vec<String>>,
pub updated: Arc<Vec<String>>,
pub removed: Arc<Vec<String>>,
}
pub type LocalEphemeralCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
pub type EphemeralSubscriber = Box<dyn Fn(&EphemeralStoreEvent) -> bool + Send + Sync + 'static>;
#[derive(Debug, Clone)]
pub struct EphemeralStore {
inner: Arc<EphemeralStoreInner>,
}
impl EphemeralStore {
pub fn new(timeout: i64) -> Self {
Self {
inner: Arc::new(EphemeralStoreInner::new(timeout)),
}
}
pub fn encode(&self, key: &str) -> Vec<u8> {
self.inner.encode(key)
}
pub fn encode_all(&self) -> Vec<u8> {
self.inner.encode_all()
}
pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
self.inner.apply(data)
}
pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
self.inner.set(key, value)
}
pub fn delete(&self, key: &str) {
self.inner.delete(key)
}
pub fn get(&self, key: &str) -> Option<LoroValue> {
self.inner.get(key)
}
pub fn remove_outdated(&self) {
self.inner.remove_outdated()
}
pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
self.inner.get_all_states()
}
pub fn keys(&self) -> Vec<String> {
self.inner.keys()
}
pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
self.inner.subscribe_local_updates(callback)
}
pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
self.inner.subscribe(callback)
}
}
struct EphemeralStoreInner {
states: Mutex<FxHashMap<String, State>>,
local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec<u8>>,
subscribers: SubscriberSetWithQueue<(), EphemeralSubscriber, EphemeralStoreEvent>,
timeout: AtomicI64,
}
impl std::fmt::Debug for EphemeralStoreInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"AwarenessV2 {{ states: {:?}, timeout: {:?} }}",
self.states, self.timeout
)
}
}
#[derive(Serialize, Deserialize)]
struct EncodedState<'a> {
#[serde(borrow)]
key: &'a str,
value: Option<LoroValue>,
timestamp: i64,
}
#[derive(Deserialize)]
struct DecodedState<'a> {
#[serde(borrow)]
key: &'a str,
#[serde(deserialize_with = "deserialize_optional_depth_limited_loro_value")]
value: Option<LoroValue>,
timestamp: i64,
}
const EPHEMERAL_VALUE_MAX_DEPTH: usize = 512;
fn deserialize_optional_depth_limited_loro_value<'de, D>(
deserializer: D,
) -> Result<Option<LoroValue>, D::Error>
where
D: Deserializer<'de>,
{
Option::<DepthLimitedLoroValue>::deserialize(deserializer)
.map(|value| value.map(DepthLimitedLoroValue::into_inner))
}
fn deserialize_depth_limited_loro_value<'de, D>(deserializer: D) -> Result<LoroValue, D::Error>
where
D: Deserializer<'de>,
{
LoroValueSeed {
remaining_depth: EPHEMERAL_VALUE_MAX_DEPTH,
}
.deserialize(deserializer)
}
struct DepthLimitedLoroValue(LoroValue);
impl DepthLimitedLoroValue {
fn into_inner(self) -> LoroValue {
self.0
}
}
impl<'de> Deserialize<'de> for DepthLimitedLoroValue {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserialize_depth_limited_loro_value(deserializer).map(Self)
}
}
#[derive(Clone, Copy)]
struct LoroValueSeed {
remaining_depth: usize,
}
impl LoroValueSeed {
fn nested<E>(self) -> Result<Self, E>
where
E: serde::de::Error,
{
let remaining_depth = self
.remaining_depth
.checked_sub(1)
.ok_or_else(|| E::custom("LoroValue nesting depth exceeded"))?;
Ok(Self { remaining_depth })
}
}
impl<'de> DeserializeSeed<'de> for LoroValueSeed {
type Value = LoroValue;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
if deserializer.is_human_readable() {
return Err(serde::de::Error::custom(
"human-readable LoroValue is not supported by the awareness decoder",
));
}
deserializer.deserialize_enum(
"LoroValue",
&[
"Null",
"Bool",
"Double",
"I32",
"String",
"List",
"Map",
"Container",
"Binary",
],
LoroValueSeedVisitor { seed: self },
)
}
}
struct LoroValueSeedVisitor {
seed: LoroValueSeed,
}
impl<'de> serde::de::Visitor<'de> for LoroValueSeedVisitor {
type Value = LoroValue;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a depth-limited LoroValue")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: EnumAccess<'de>,
{
match data.variant()? {
(DepthLimitedLoroValueField::Null, v) => {
v.unit_variant()?;
Ok(LoroValue::Null)
}
(DepthLimitedLoroValueField::Bool, v) => v.newtype_variant().map(LoroValue::Bool),
(DepthLimitedLoroValueField::Double, v) => v.newtype_variant().map(LoroValue::Double),
(DepthLimitedLoroValueField::I32, v) => v.newtype_variant().map(LoroValue::I64),
(DepthLimitedLoroValueField::String, v) => v
.newtype_variant()
.map(|value: String| LoroValue::String(value.into())),
(DepthLimitedLoroValueField::List, v) => v
.newtype_variant_seed(LoroListSeed {
value_seed: self.seed.nested()?,
})
.map(|value| LoroValue::List(value.into())),
(DepthLimitedLoroValueField::Map, v) => v
.newtype_variant_seed(LoroMapSeed {
value_seed: self.seed.nested()?,
})
.map(|value| LoroValue::Map(value.into())),
(DepthLimitedLoroValueField::Container, v) => {
v.newtype_variant().map(LoroValue::Container)
}
(DepthLimitedLoroValueField::Binary, v) => v
.newtype_variant()
.map(|value: Vec<u8>| LoroValue::Binary(value.into())),
}
}
}
#[derive(Deserialize)]
enum DepthLimitedLoroValueField {
Null,
Bool,
Double,
I32,
String,
List,
Map,
Container,
Binary,
}
struct LoroListSeed {
value_seed: LoroValueSeed,
}
impl<'de> DeserializeSeed<'de> for LoroListSeed {
type Value = Vec<LoroValue>;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_seq(LoroListVisitor {
value_seed: self.value_seed,
})
}
}
struct LoroListVisitor {
value_seed: LoroValueSeed,
}
impl<'de> serde::de::Visitor<'de> for LoroListVisitor {
type Value = Vec<LoroValue>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a depth-limited LoroValue list")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut list = Vec::new();
while let Some(value) = seq.next_element_seed(self.value_seed)? {
list.push(value);
}
Ok(list)
}
}
struct LoroMapSeed {
value_seed: LoroValueSeed,
}
impl<'de> DeserializeSeed<'de> for LoroMapSeed {
type Value = FxHashMap<String, LoroValue>;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_map(LoroMapVisitor {
value_seed: self.value_seed,
})
}
}
struct LoroMapVisitor {
value_seed: LoroValueSeed,
}
impl<'de> serde::de::Visitor<'de> for LoroMapVisitor {
type Value = FxHashMap<String, LoroValue>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a depth-limited LoroValue map")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut value = FxHashMap::default();
while let Some(key) = map.next_key()? {
let entry = map.next_value_seed(self.value_seed)?;
value.insert(key, entry);
}
Ok(value)
}
}
#[derive(Debug, Clone)]
struct State {
state: Option<LoroValue>,
timestamp: i64,
}
impl EphemeralStoreInner {
pub fn new(timeout: i64) -> EphemeralStoreInner {
EphemeralStoreInner {
timeout: AtomicI64::new(timeout),
states: Mutex::new(FxHashMap::default()),
local_subs: SubscriberSetWithQueue::new(),
subscribers: SubscriberSetWithQueue::new(),
}
}
pub fn encode(&self, key: &str) -> Vec<u8> {
let mut peers_info = Vec::new();
let now = get_sys_timestamp() as Timestamp;
let states = self.states.lock();
if let Some(peer_state) = states.get(key) {
if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
{
return vec![];
}
let encoded_peer_info = EncodedState {
key,
value: peer_state.state.clone(),
timestamp: peer_state.timestamp,
};
peers_info.push(encoded_peer_info);
}
postcard::to_allocvec(&peers_info).unwrap()
}
pub fn encode_all(&self) -> Vec<u8> {
let mut peers_info = Vec::new();
let now = get_sys_timestamp() as Timestamp;
let states = self.states.lock();
for (key, peer_state) in states.iter() {
if now - peer_state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed)
{
continue;
}
let encoded_peer_info = EncodedState {
key,
value: peer_state.state.clone(),
timestamp: peer_state.timestamp,
};
peers_info.push(encoded_peer_info);
}
postcard::to_allocvec(&peers_info).unwrap()
}
pub fn apply(&self, data: &[u8]) -> Result<(), Box<str>> {
let peers_info = match postcard::from_bytes::<Vec<DecodedState>>(data) {
Ok(ans) => ans,
Err(err) => return Err(format!("Failed to decode data: {}", err).into()),
};
let mut updated_keys = Vec::new();
let mut added_keys = Vec::new();
let mut removed_keys = Vec::new();
let now = get_sys_timestamp() as Timestamp;
let timeout = self.timeout.load(std::sync::atomic::Ordering::Relaxed);
let mut states = self.states.lock();
for DecodedState {
key,
value: record,
timestamp,
} in peers_info
{
if now - timestamp > timeout {
continue;
}
match states.get_mut(key) {
Some(peer_info) if peer_info.timestamp >= timestamp => {
}
_ => {
let old = states.insert(
key.to_string(),
State {
state: record.clone(),
timestamp,
},
);
match (old, record) {
(Some(_), Some(_)) => updated_keys.push(key.to_string()),
(None, Some(_)) => added_keys.push(key.to_string()),
(Some(_), None) => removed_keys.push(key.to_string()),
(None, None) => {}
}
}
}
}
drop(states);
if !self.subscribers.inner().is_empty() {
self.subscribers.emit(
&(),
EphemeralStoreEvent {
by: EphemeralEventTrigger::Import,
added: Arc::new(added_keys),
updated: Arc::new(updated_keys),
removed: Arc::new(removed_keys),
},
);
}
Ok(())
}
pub fn set(&self, key: &str, value: impl Into<LoroValue>) {
self._set_local_state(key, Some(value.into()));
}
pub fn delete(&self, key: &str) {
self._set_local_state(key, None);
}
pub fn get(&self, key: &str) -> Option<LoroValue> {
let states = self.states.lock();
states.get(key).and_then(|x| x.state.clone())
}
pub fn remove_outdated(&self) {
let now = get_sys_timestamp() as Timestamp;
let mut removed = Vec::new();
let mut states = self.states.lock();
states.retain(|key, state| {
if now - state.timestamp > self.timeout.load(std::sync::atomic::Ordering::Relaxed) {
if state.state.is_some() {
removed.push(key.clone());
}
false
} else {
true
}
});
drop(states);
if !self.subscribers.inner().is_empty() {
self.subscribers.emit(
&(),
EphemeralStoreEvent {
by: EphemeralEventTrigger::Timeout,
added: Arc::new(Vec::new()),
updated: Arc::new(Vec::new()),
removed: Arc::new(removed),
},
);
}
}
pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
let states = self.states.lock();
states
.iter()
.filter(|(_, v)| v.state.is_some())
.map(|(k, v)| (k.clone(), v.state.clone().unwrap()))
.collect()
}
pub fn keys(&self) -> Vec<String> {
let states = self.states.lock();
states
.keys()
.filter(|&k| states.get(k).unwrap().state.is_some())
.map(|s| s.to_string())
.collect()
}
pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
let (sub, activate) = self.local_subs.inner().insert((), callback);
activate();
sub
}
pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
let (sub, activate) = self.subscribers.inner().insert((), callback);
activate();
sub
}
fn _set_local_state(&self, key: &str, value: Option<LoroValue>) {
let is_delete = value.is_none();
let mut states = self.states.lock();
let old = states.insert(
key.to_string(),
State {
state: value,
timestamp: get_sys_timestamp() as Timestamp,
},
);
drop(states);
if !self.local_subs.inner().is_empty() {
self.local_subs.emit(&(), self.encode(key));
}
if !self.subscribers.inner().is_empty() {
if old.is_some() {
self.subscribers.emit(
&(),
EphemeralStoreEvent {
by: EphemeralEventTrigger::Local,
added: Arc::new(Vec::new()),
updated: if !is_delete {
Arc::new(vec![key.to_string()])
} else {
Arc::new(Vec::new())
},
removed: if !is_delete {
Arc::new(Vec::new())
} else {
Arc::new(vec![key.to_string()])
},
},
);
} else if !is_delete {
self.subscribers.emit(
&(),
EphemeralStoreEvent {
by: EphemeralEventTrigger::Local,
added: Arc::new(vec![key.to_string()]),
updated: Arc::new(Vec::new()),
removed: Arc::new(Vec::new()),
},
);
}
}
}
}