iroh_smol_kv/
lib.rs

1//! A small, gossip-based, eventually consistent key-value store.
2//!
3//! The entry point is the [`Client`] struct, which can be created with a
4//! [`GossipTopic`] and a [`Config`].
5//!
6//! Writing to the store requires a [`WriteScope`], which is created
7//! from the client using a [`SecretKey`].
8//!
9//! You can read from the store with [`Client::get`], but mostly you will
10//! create a subscription using [`Client::subscribe`] and then get updates
11//! of type [`SubscribeItem`].
12//!
13//! Subscriptions can be restricted using a [`Filter`]. You can also specify
14//! if you want just the current state or future updates, using [`SubscribeMode`].
15//!
16//! Entries in the database expire after not being modified for a certain duration,
17//! which can be specified in the [`ExpiryConfig`].
18use std::{
19    collections::HashSet,
20    ops::{Bound, RangeBounds},
21    time::{Duration, SystemTime},
22};
23
24use bytes::{Bytes, BytesMut};
25use iroh::{NodeId, PublicKey, SecretKey};
26use iroh_gossip::api::{Event, GossipReceiver, GossipSender, GossipTopic};
27use irpc::{
28    channel::{mpsc, oneshot},
29    rpc_requests,
30};
31use n0_future::{FuturesUnordered, StreamExt, TryFutureExt, TryStreamExt, boxed::BoxFuture};
32pub use proto::SignedValue;
33use proto::{GossipMessage, SigningData};
34use rand::seq::SliceRandom;
35use serde::{Deserialize, Serialize};
36use snafu::Snafu;
37use sync_wrapper::SyncStream;
38use tokio::sync::broadcast;
39use tracing::{error, trace};
40use util::{current_timestamp, next_prefix, postcard_ser, to_nanos};
41
42type Entry = (PublicKey, Bytes, SignedValue);
43
44pub mod proto {
45    //! Gossip protocol messages and helpers
46    use std::time::SystemTime;
47
48    use bytes::Bytes;
49    use iroh::PublicKey;
50    use serde::{Deserialize, Serialize};
51    use serde_big_array::BigArray;
52
53    use crate::util::{DD, from_nanos};
54    #[derive(Clone, Serialize, Deserialize)]
55    pub struct SignedValue {
56        /// Timestamp in nanoseconds since epoch
57        pub timestamp: u64,
58        /// The actual value
59        pub value: Bytes,
60        /// Signature over (key, timestamp, value)
61        #[serde(with = "BigArray")]
62        pub signature: [u8; 64],
63    }
64
65    impl std::fmt::Debug for SignedValue {
66        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67            f.debug_tuple("SignedValue")
68                .field(&self.timestamp)
69                .field(&self.value)
70                .field(&DD(hex::encode(self.signature)))
71                .finish()
72        }
73    }
74
75    impl SignedValue {
76        pub fn system_time(&self) -> SystemTime {
77            from_nanos(self.timestamp)
78        }
79    }
80
81    #[derive(Debug, Serialize, Deserialize)]
82    pub enum GossipMessage {
83        SignedValue(PublicKey, Bytes, SignedValue),
84    }
85
86    #[derive(Debug, Clone, Serialize, Deserialize)]
87    pub(crate) struct SigningData<'a> {
88        pub key: &'a [u8],
89        pub timestamp: u64,
90        pub value: &'a [u8],
91    }
92}
93
94#[derive(Debug, Snafu)]
95#[snafu(visibility(pub(crate)))]
96#[snafu(module)]
97pub enum InsertError {
98    #[snafu(transparent)]
99    Signature {
100        source: ed25519_dalek::SignatureError,
101    },
102    #[snafu(display("Value too old: existing timestamp {}, new timestamp {}", old, new))]
103    ValueTooOld { old: u64, new: u64 },
104}
105
106#[derive(Debug, Serialize, Deserialize)]
107struct Put {
108    pub scope: PublicKey,
109    pub key: Bytes,
110    pub value: SignedValue,
111}
112
113#[derive(Debug, Serialize, Deserialize)]
114struct Get {
115    pub scope: PublicKey,
116    pub key: Bytes,
117}
118
119#[derive(Debug, Serialize, Deserialize)]
120pub enum SubscribeMode {
121    /// Only send current values that match the filter
122    Current,
123    /// Send future values that match the filter
124    Future,
125    /// Send both current and future values that match the filter
126    Both,
127}
128
129#[derive(Debug, Serialize, Deserialize, Clone)]
130pub enum SubscribeItem {
131    /// A matching entry (scope, key, value)
132    Entry(Entry),
133    /// Marker that all current entries have been sent, and future entries will follow.
134    ///
135    /// You will get at most one CurrentDone message per subscription.
136    CurrentDone,
137    /// An entry that has expired (removed). The u64 is the timestamp when it was last modified.
138    Expired((PublicKey, Bytes, u64)),
139}
140
141#[derive(Clone)]
142pub(crate) enum BroadcastItem {
143    /// A matching entry (scope, key, value)
144    Entry(Entry),
145    /// An entry that has expired (removed)
146    Expired((PublicKey, Bytes, u64)),
147}
148
149impl From<BroadcastItem> for SubscribeItem {
150    fn from(item: BroadcastItem) -> Self {
151        match item {
152            BroadcastItem::Entry(entry) => SubscribeItem::Entry(entry),
153            BroadcastItem::Expired(entry) => SubscribeItem::Expired(entry),
154        }
155    }
156}
157
158impl BroadcastItem {
159    fn contained_in(&self, filter: &Filter) -> bool {
160        match self {
161            BroadcastItem::Entry((scope, key, value)) => {
162                filter.contains(scope, key, value.timestamp)
163            }
164            BroadcastItem::Expired((scope, key, _)) => filter.contains_key(scope, key),
165        }
166    }
167}
168
169#[derive(Debug, Serialize, Deserialize)]
170pub struct Subscribe {
171    pub mode: SubscribeMode,
172    pub filter: Filter,
173}
174
175#[derive(Debug, Serialize, Deserialize)]
176pub(crate) struct JoinPeers {
177    pub peers: Vec<NodeId>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181pub(crate) struct Shutdown;
182
183#[derive(Debug, Serialize, Deserialize)]
184#[rpc_requests(message = Message)]
185enum Proto {
186    #[rpc(tx = oneshot::Sender<()>)]
187    Put(Put),
188    #[rpc(tx = oneshot::Sender<Option<SignedValue>>)]
189    Get(Get),
190    #[rpc(tx = mpsc::Sender<SubscribeItem>)]
191    Subscribe(Subscribe),
192    #[rpc(tx = oneshot::Sender<()>)]
193    JoinPeers(JoinPeers),
194    #[rpc(tx = oneshot::Sender<()>)]
195    Shutdown(Shutdown),
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct Filter {
200    /// None means no filtering by scope
201    pub scope: Option<HashSet<PublicKey>>,
202    /// Range of keys to include
203    pub key: (Bound<Bytes>, Bound<Bytes>),
204    /// Range of timestamps (in nanoseconds since epoch) to include
205    pub timestamp: (Bound<u64>, Bound<u64>),
206}
207
208impl Filter {
209    pub const ALL: Self = Self {
210        scope: None,
211        key: (Bound::Unbounded, Bound::Unbounded),
212        timestamp: (Bound::Unbounded, Bound::Unbounded),
213    };
214    pub const EMPTY: Self = Self {
215        scope: None,
216        key: (Bound::Unbounded, Bound::Excluded(Bytes::new())),
217        timestamp: (Bound::Unbounded, Bound::Excluded(0)),
218    };
219    /// Sets the scope filter to a single public key.
220    pub fn scope(self, scope: PublicKey) -> Self {
221        self.scopes(Some(scope))
222    }
223    /// Sets the scope filter to a set of public keys.
224    pub fn scopes(self, scope: impl IntoIterator<Item = PublicKey>) -> Self {
225        let scope = scope.into_iter().collect();
226        Self {
227            scope: Some(scope),
228            key: self.key,
229            timestamp: self.timestamp,
230        }
231    }
232    /// Sets the key filter to a single key.
233    pub fn key(self, key: impl Into<Bytes>) -> Self {
234        let key = key.into();
235        Self {
236            scope: self.scope,
237            key: (Bound::Included(key.clone()), Bound::Included(key)),
238            timestamp: self.timestamp,
239        }
240    }
241    /// Sets the key filter to a range of keys.
242    pub fn keys<I, V>(self, range: I) -> Self
243    where
244        I: RangeBounds<V>,
245        V: Clone + Into<Bytes>,
246    {
247        let start = range.start_bound().map(|x| x.clone().into());
248        let end = range.end_bound().map(|x| x.clone().into());
249        Self {
250            scope: self.scope,
251            key: (start, end),
252            timestamp: self.timestamp,
253        }
254    }
255    /// Sets the key filter to all keys with the given prefix.
256    pub fn key_prefix(self, prefix: impl Into<Bytes>) -> Self {
257        let prefix = prefix.into();
258        let mut end = prefix.to_vec();
259        let start = Bound::Included(prefix);
260        let end = if next_prefix(&mut end) {
261            Bound::Excluded(end.into())
262        } else {
263            Bound::Unbounded
264        };
265        Self {
266            scope: self.scope,
267            key: (start, end),
268            timestamp: self.timestamp,
269        }
270    }
271
272    /// Sets the timestamp filter to a range of system times.
273    pub fn timestamps(self, range: impl RangeBounds<SystemTime>) -> Self {
274        let start = range.start_bound().map(to_nanos);
275        let end = range.end_bound().map(to_nanos);
276        self.timestamps_nanos((start, end))
277    }
278
279    /// Sets the timestamp filter to a range of system times.
280    pub fn timestamps_nanos(self, range: impl RangeBounds<u64>) -> Self {
281        let start = range.start_bound().cloned();
282        let end = range.end_bound().cloned();
283        Self {
284            scope: self.scope,
285            key: self.key,
286            timestamp: (start, end),
287        }
288    }
289
290    /// Checks if the given entry matches the filter.
291    pub fn contains(&self, scope: &PublicKey, key: &[u8], timestamp: u64) -> bool {
292        self.contains_key(scope, key) && self.timestamp.contains(&timestamp)
293    }
294
295    /// Checks if the given scope and key match the filter, excluding timestamp.
296    pub fn contains_key(&self, scope: &PublicKey, key: &[u8]) -> bool {
297        if let Some(scopes) = &self.scope
298            && !scopes.contains(scope)
299        {
300            return false;
301        }
302        self.key.contains(key)
303    }
304}
305
306pub struct IterResult(BoxFuture<Result<mpsc::Receiver<SubscribeItem>, irpc::Error>>);
307
308impl IterResult {
309    pub async fn collect<C: Default + Extend<(PublicKey, Bytes, Bytes)>>(
310        self,
311    ) -> Result<C, irpc::Error> {
312        let mut rx = self.0.await?;
313        let mut items = C::default();
314        while let Some(SubscribeItem::Entry((scope, key, value))) = rx.recv().await? {
315            items.extend(Some((scope, key, value.value)));
316        }
317        Ok(items)
318    }
319}
320
321pub struct SubscribeResponse(BoxFuture<Result<mpsc::Receiver<SubscribeItem>, irpc::Error>>);
322
323impl SubscribeResponse {
324    /// Stream of entries from the subscription, as raw SubscribeResponse values.
325    pub fn stream_raw(
326        self,
327    ) -> impl n0_future::Stream<Item = Result<SubscribeItem, irpc::Error>> + Send + Sync + 'static
328    {
329        SyncStream::new(
330            async move {
331                let rx = self.0.await?;
332                Ok(rx.into_stream().map_err(irpc::Error::from))
333            }
334            .try_flatten_stream(),
335        )
336    }
337
338    /// Stream of entries from the subscription, without distinguishing current vs future.
339    pub fn stream(
340        self,
341    ) -> impl n0_future::Stream<Item = Result<Entry, irpc::Error>> + Send + Sync + 'static {
342        SyncStream::new(
343            async move {
344                let rx = self.0.await?;
345                Ok(rx
346                    .into_stream()
347                    .try_filter_map(|res| async move {
348                        match res {
349                            SubscribeItem::Entry(entry) => Ok(Some(entry)),
350                            SubscribeItem::Expired((_, _, _)) => Ok(None),
351                            SubscribeItem::CurrentDone => Ok(None),
352                        }
353                    })
354                    .map_err(irpc::Error::from))
355            }
356            .try_flatten_stream(),
357        )
358    }
359}
360
361#[derive(Debug, Clone)]
362pub struct Client(irpc::Client<Proto>);
363
364#[derive(Debug, Clone)]
365pub struct WriteScope {
366    api: Client,
367    secret: SecretKey,
368    public: PublicKey,
369}
370
371impl WriteScope {
372    pub fn scope(&self) -> PublicKey {
373        self.public
374    }
375
376    pub async fn put(
377        &self,
378        key: impl Into<Bytes>,
379        value: impl Into<Bytes>,
380    ) -> Result<(), irpc::Error> {
381        let key = key.into();
382        let value = value.into();
383        let timestamp = current_timestamp();
384        let signing_data = SigningData {
385            key: &key,
386            timestamp,
387            value: &value,
388        };
389        let signing_data_bytes = postcard::to_stdvec(&signing_data).expect("signing data to vec");
390        let signature = self.secret.sign(&signing_data_bytes);
391        let signed_value = SignedValue {
392            timestamp,
393            value: value.clone(),
394            signature: signature.to_bytes(),
395        };
396        self.api.put(self.public, key, signed_value).await
397    }
398}
399
400impl Client {
401    /// Create a local client. This requires a tokio runtime.
402    pub fn local(topic: GossipTopic, config: Config) -> Self {
403        let (tx, rx) = tokio::sync::mpsc::channel(32);
404        let actor = Actor::new(topic, rx, config);
405        tokio::spawn(actor.run());
406        Self(tx.into())
407    }
408
409    /// This isn't public because it does not verify the signature on the value.
410    async fn put(
411        &self,
412        scope: PublicKey,
413        key: Bytes,
414        value: SignedValue,
415    ) -> Result<(), irpc::Error> {
416        self.0.rpc(Put { scope, key, value }).await
417    }
418
419    /// Create a write scope that can put values signed by the given secret key.
420    pub fn write(&self, secret: SecretKey) -> WriteScope {
421        WriteScope {
422            api: self.clone(),
423            public: secret.public(),
424            secret,
425        }
426    }
427
428    pub async fn get(
429        &self,
430        scope: PublicKey,
431        key: impl Into<Bytes>,
432    ) -> Result<Option<Bytes>, irpc::Error> {
433        let value = self
434            .0
435            .rpc(Get {
436                scope,
437                key: key.into(),
438            })
439            .await?;
440        Ok(value.map(|sv| sv.value))
441    }
442
443    pub fn subscribe(&self) -> SubscribeResponse {
444        self.subscribe_with_opts(Subscribe {
445            mode: SubscribeMode::Both,
446            filter: Filter::ALL,
447        })
448    }
449
450    pub fn subscribe_with_opts(&self, subscribe: Subscribe) -> SubscribeResponse {
451        SubscribeResponse(Box::pin(self.0.server_streaming(subscribe, 32)))
452    }
453
454    pub fn iter_with_opts(&self, filter: Filter) -> IterResult {
455        let subscribe = Subscribe {
456            mode: SubscribeMode::Current,
457            filter,
458        };
459        IterResult(Box::pin(self.0.server_streaming(subscribe, 32)))
460    }
461
462    pub fn iter(&self) -> IterResult {
463        let subscribe = Subscribe {
464            mode: SubscribeMode::Current,
465            filter: Filter::ALL,
466        };
467        IterResult(Box::pin(self.0.server_streaming(subscribe, 32)))
468    }
469
470    pub fn join_peers(
471        &self,
472        peers: impl IntoIterator<Item = NodeId>,
473    ) -> impl n0_future::Future<Output = Result<(), irpc::Error>> {
474        let peers = JoinPeers {
475            peers: peers.into_iter().collect(),
476        };
477        self.0.rpc(peers)
478    }
479
480    pub async fn shutdown(&self) -> Result<(), irpc::Error> {
481        let _ = self.0.rpc(Shutdown).await?;
482        Ok(())
483    }
484}
485
486#[derive(Debug, Clone, Serialize, Deserialize, Default)]
487struct State {
488    current: rpds::HashTrieMapSync<PublicKey, rpds::RedBlackTreeMapSync<Bytes, SignedValue>>,
489}
490
491impl State {
492    fn new() -> Self {
493        Self::default()
494    }
495
496    fn snapshot(&self) -> Self {
497        self.clone()
498    }
499
500    fn insert_signed_value(
501        &mut self,
502        scope: PublicKey,
503        key: Bytes,
504        value: SignedValue,
505    ) -> Result<(), InsertError> {
506        let signing_data = SigningData {
507            key: &key,
508            timestamp: value.timestamp,
509            value: &value.value,
510        };
511        let signing_data_bytes = postcard::to_stdvec(&signing_data).expect("signing data to vec");
512        let signature = ed25519_dalek::Signature::from_bytes(&value.signature);
513        scope.verify(&signing_data_bytes, &signature)?;
514        self.insert_signed_value_unverified(scope, key, value)
515    }
516
517    fn insert_signed_value_unverified(
518        &mut self,
519        scope: PublicKey,
520        key: Bytes,
521        value: SignedValue,
522    ) -> Result<(), InsertError> {
523        let per_node = if let Some(current) = self.current.get_mut(&scope) {
524            current
525        } else {
526            self.current.insert_mut(scope, Default::default());
527            self.current.get_mut(&scope).expect("just inserted")
528        };
529        match per_node.get_mut(&key) {
530            Some(existing) if existing.timestamp >= value.timestamp => {
531                return Err(insert_error::ValueTooOldSnafu {
532                    old: existing.timestamp,
533                    new: value.timestamp,
534                }
535                .build());
536            }
537            _ => {
538                per_node.insert_mut(key, value);
539            }
540        }
541        Ok(())
542    }
543
544    fn get(&self, scope: &PublicKey, key: &Bytes) -> Option<&SignedValue> {
545        self.current.get(scope).and_then(|m| m.get(key))
546    }
547
548    fn flatten_filtered(
549        &self,
550        filter: &Filter,
551    ) -> impl Iterator<Item = (&PublicKey, &Bytes, &SignedValue)> {
552        // first filter by scope using a full scan
553        let filtered_by_scope = self.current.iter().filter(|(scope, _)| {
554            filter
555                .scope
556                .as_ref()
557                .map(|scopes| scopes.contains(*scope))
558                .unwrap_or(true)
559        });
560        filtered_by_scope.flat_map(move |(scope, map)| {
561            // filter by key range using the tree structure
562            let filtered_by_key = map
563                .range(filter.key.clone())
564                .filter(move |(_, signed_value)| {
565                    filter.timestamp.contains(&signed_value.timestamp)
566                });
567            // filter by timestamp using a full scan of the remaining items
568            let filtered_by_timestamp = filtered_by_key.filter(move |(_, signed_value)| {
569                filter.timestamp.contains(&signed_value.timestamp)
570            });
571            // add the scope
572            filtered_by_timestamp.map(move |(key, signed_value)| (scope, key, signed_value))
573        })
574    }
575
576    fn flatten(&self) -> impl Iterator<Item = (&PublicKey, &Bytes, &SignedValue)> {
577        self.current.iter().flat_map(|(scope, map)| {
578            map.iter()
579                .map(move |(key, signed_value)| (scope, key, signed_value))
580        })
581    }
582}
583
584struct Actor {
585    sender: GossipSender,
586    receiver: GossipReceiver,
587    rx: tokio::sync::mpsc::Receiver<Message>,
588    config: Config,
589    state: State,
590    broadcast_tx: broadcast::Sender<BroadcastItem>,
591}
592
593impl Actor {
594    fn new(topic: GossipTopic, rx: tokio::sync::mpsc::Receiver<Message>, config: Config) -> Self {
595        let (sender, receiver) = topic.split();
596        let (broadcast_tx, _) = tokio::sync::broadcast::channel(32);
597        Self {
598            state: State::new(),
599            sender,
600            receiver,
601            rx,
602            broadcast_tx,
603            config,
604        }
605    }
606
607    fn horizon(&self) -> Option<u64> {
608        self.config
609            .expiry
610            .as_ref()
611            .map(|d| current_timestamp() - d.horizon.as_nanos() as u64)
612    }
613
614    fn apply_horizon(&mut self) -> Vec<(PublicKey, Bytes, u64)> {
615        let mut expired = Vec::new();
616        let Some(horizon) = self.horizon() else {
617            return expired;
618        };
619        for (scope, map) in self.state.current.iter() {
620            for (key, value) in map.iter() {
621                if value.timestamp < horizon {
622                    expired.push((*scope, key.clone(), value.timestamp));
623                }
624            }
625        }
626        let mut expired_scopes = HashSet::new();
627        for (scope, key, _) in &expired {
628            let entry = self.state.current.get_mut(scope).expect("just checked");
629            entry.remove_mut(key);
630            if entry.is_empty() {
631                expired_scopes.insert(*scope);
632            }
633        }
634        for scope in expired_scopes {
635            self.state.current.remove_mut(&scope);
636        }
637        expired
638    }
639
640    /// Publish all known values in random order over the gossip network, spaced out evenly over the given total duration.
641    async fn anti_entropy(
642        snapshot: State,
643        sender: GossipSender,
644        total: Duration,
645    ) -> Result<(), iroh_gossip::api::ApiError> {
646        trace!(
647            "Starting anti-entropy with {} items for {:?}",
648            snapshot.flatten().count(),
649            total
650        );
651        let mut to_publish = snapshot.flatten().collect::<Vec<_>>();
652        to_publish.shuffle(&mut rand::rng());
653        let n = to_publish.len();
654        if n == 0 {
655            tokio::time::sleep(total).await;
656            return Ok(());
657        }
658        let delay = total / (n as u32);
659        let mut buf = BytesMut::with_capacity(4096);
660        for (scope, key, signed_value) in to_publish {
661            let gossip_msg = GossipMessage::SignedValue(*scope, key.clone(), signed_value.clone());
662            let gossip_msg = postcard_ser(&gossip_msg, &mut buf);
663            trace!(
664                "Anti-entropy publishing key={:?} at={:?}",
665                key,
666                signed_value.timestamp / 1_000_000_000
667            );
668            sender.broadcast_neighbors(gossip_msg).await?;
669            tokio::time::sleep(delay).await;
670        }
671        Ok(())
672    }
673
674    async fn iter_current(
675        tx: &irpc::channel::mpsc::Sender<SubscribeItem>,
676        snapshot: &State,
677        filter: &Filter,
678    ) -> Result<(), irpc::Error> {
679        for (scope, key, signed_value) in snapshot.flatten_filtered(filter) {
680            tx.send(SubscribeItem::Entry((
681                *scope,
682                key.clone(),
683                signed_value.clone(),
684            )))
685            .await?;
686        }
687        Ok(())
688    }
689
690    async fn handle_subscribe(
691        tx: mpsc::Sender<SubscribeItem>,
692        filter: Filter,
693        current: Option<State>,
694        future: Option<tokio::sync::broadcast::Receiver<BroadcastItem>>,
695    ) {
696        if let Some(snapshot) = current
697            && Self::iter_current(&tx, &snapshot, &filter).await.is_err()
698        {
699            return;
700        }
701        let Some(mut broadcast_rx) = future else {
702            return;
703        };
704        // Indicate that current values are done, and we are now sending future values.
705        if tx.send(SubscribeItem::CurrentDone).await.is_err() {
706            return;
707        }
708        loop {
709            tokio::select! {
710                item = broadcast_rx.recv() => {
711                    let Ok(item) = item else {
712                        break;
713                    };
714                    if !item.contained_in(&filter) {
715                        continue;
716                    }
717                    if tx.send(item.into()).await.is_err() {
718                        break;
719                    }
720                }
721                _ = tx.closed() => {
722                    break;
723                }
724            }
725        }
726    }
727
728    async fn run(mut self) {
729        let mut tasks = FuturesUnordered::<n0_future::boxed::BoxFuture<()>>::new();
730        let mut buf = bytes::BytesMut::with_capacity(4096);
731        let anti_entropy = Self::anti_entropy(
732            self.state.snapshot(),
733            self.sender.clone(),
734            self.config.anti_entropy_interval,
735        );
736        let horizon_period = match self.horizon() {
737            Some(_) => Duration::from_secs(30),
738            None => Duration::MAX,
739        };
740        let apply_horizon = Box::pin(tokio::time::sleep(horizon_period));
741        tokio::pin!(anti_entropy, apply_horizon);
742        loop {
743            tokio::select! {
744                msg = self.rx.recv() => {
745                    trace!("Received local message {:?}", msg);
746                    let Some(msg) = msg else {
747                        break;
748                    };
749                    match msg {
750                        Message::Put(msg) => {
751                            self.state.insert_signed_value_unverified(msg.scope, msg.key.clone(), msg.value.clone())
752                                .expect("inserting local value should always work");
753                            let gossip_msg = GossipMessage::SignedValue(msg.scope, msg.key.clone(), msg.value.clone());
754                            let gossip_msg = postcard_ser(&gossip_msg, &mut buf);
755                            self.sender.broadcast(gossip_msg).await.ok();
756                            self.broadcast_tx.send(BroadcastItem::Entry((msg.scope, msg.key.clone(), msg.value.clone()))).ok();
757                            msg.tx.send(()).await.ok();
758                        }
759                        Message::Get(msg) => {
760                            let res = self.state.get(&msg.scope, &msg.key);
761                            msg.tx.send(res.cloned()).await.ok();
762                        }
763                        Message::Subscribe(msg) => {
764                            let broadcast_rx = self.broadcast_tx.subscribe();
765                            let filter = msg.filter.clone();
766                            let (current, future) = match msg.mode {
767                                SubscribeMode::Current => (Some(self.state.snapshot()), None),
768                                SubscribeMode::Future => (None, Some(broadcast_rx)),
769                                SubscribeMode::Both => (Some(self.state.snapshot()), Some(broadcast_rx)),
770                            };
771                            tasks.push(Box::pin(Self::handle_subscribe(msg.tx, filter, current, future)));
772                        }
773                        Message::JoinPeers(msg) => {
774                            let res = self.sender.join_peers(msg.peers.clone()).await;
775                            msg.tx.send(()).await.ok();
776                            if let Err(e) = res {
777                                error!("Error joining peers: {:?}", e);
778                                break;
779                            }
780                        }
781                        Message::Shutdown(msg) => {
782                            msg.tx.send(()).await.ok();
783                            break;
784                        }
785                    }
786                }
787                msg = self.receiver.next() => {
788                    trace!("Received gossip message {:?}", msg);
789                    let Some(msg) = msg else {
790                        error!("Gossip receiver closed");
791                        break;
792                    };
793                    let msg = match msg {
794                        Ok(msg) => msg,
795                        Err(cause) => {
796                            error!("Error receiving message: {:?}", cause);
797                            break;
798                        }
799                    };
800                    let msg = match msg {
801                        Event::Received(msg) => msg,
802                        Event::NeighborUp(peer) => {
803                            trace!("New peer {}, starting fast anti-entropy", peer.fmt_short());
804                            anti_entropy.set(Self::anti_entropy(self.state.snapshot(), self.sender.clone(), self.config.fast_anti_entropy_interval));
805                            continue;
806                        },
807                        Event::NeighborDown(peer) => {
808                            trace!("Peer down: {}, goodbye!", peer.fmt_short());
809                            continue;
810                        },
811                        e => {
812                            trace!("Ignoring event: {:?}", e);
813                            continue
814                        },
815                    };
816                    let msg = match postcard::from_bytes::<GossipMessage>(&msg.content) {
817                        Ok(msg) => msg,
818                        Err(e) => {
819                            trace!("Error deserializing gossip message: {:?}", e);
820                            continue;
821                        }
822                    };
823                    match msg {
824                        GossipMessage::SignedValue(scope, key, value) => {
825                            if let Some(horizon) = self.horizon()
826                                && value.timestamp < horizon {
827                                    trace!("Ignoring value key={:?} epoch={} below horizon", key, value.timestamp);
828                                    continue;
829                                }
830                            let id = scope.fmt_short();
831                            trace!(%id, "Received signed value key={:?} epoch={}", key, value.timestamp);
832                            let Ok(_) = self.state.insert_signed_value(scope, key.clone(), value.clone()) else {
833                                continue;
834                            };
835                            trace!(%id, "Broadcasting internally");
836                            self.broadcast_tx.send(BroadcastItem::Entry((scope, key, value))).ok();
837                        }
838                    }
839                }
840                res = &mut anti_entropy => {
841                    if let Err(e) = res {
842                        error!("Error in anti-entropy: {:?}", e);
843                        break;
844                    }
845                    // anti-entropy finished, start a new one
846                    anti_entropy.set(Self::anti_entropy(self.state.snapshot(), self.sender.clone(), self.config.anti_entropy_interval));
847                }
848                _ = &mut apply_horizon => {
849                    let expired = self.apply_horizon();
850                    for entry in expired {
851                        self.broadcast_tx.send(BroadcastItem::Expired(entry)).ok();
852                    }
853                    // schedule next horizon application
854                    apply_horizon.set(Box::pin(tokio::time::sleep(horizon_period)));
855                }
856                _ = tasks.next(), if !tasks.is_empty() => {}
857            }
858        }
859    }
860}
861
862#[derive(Debug, Clone)]
863pub struct Config {
864    pub anti_entropy_interval: Duration,
865    pub fast_anti_entropy_interval: Duration,
866    /// Optional horizon duration. Values older than now - horizon are removed,
867    /// and will not be re-added.
868    pub expiry: Option<ExpiryConfig>,
869}
870
871#[derive(Debug, Clone)]
872pub struct ExpiryConfig {
873    /// Duration after which values expire.
874    pub horizon: Duration,
875    /// How often to check for expired values.
876    pub check_interval: Duration,
877}
878
879impl Config {
880    pub const DEBUG: Self = Self {
881        anti_entropy_interval: Duration::from_secs(30),
882        fast_anti_entropy_interval: Duration::from_secs(5),
883        expiry: Some(ExpiryConfig {
884            horizon: Duration::from_secs(30),
885            check_interval: Duration::from_secs(10),
886        }),
887    };
888}
889
890impl Default for Config {
891    fn default() -> Self {
892        Self {
893            // republish all known values every 5 minutes
894            anti_entropy_interval: Duration::from_secs(300),
895            // republish all known values every 10 seconds when we get a new peer
896            fast_anti_entropy_interval: Duration::from_secs(10),
897            // keep values for 24 hours, purge every hour
898            expiry: Some(ExpiryConfig {
899                horizon: Duration::from_secs(3600 * 24),
900                check_interval: Duration::from_secs(3600),
901            }),
902        }
903    }
904}
905
906pub mod util {
907    //! Utility functions for working with strings and nanosecond timestamps.
908    use std::{
909        fmt,
910        time::{Duration, SystemTime},
911    };
912
913    use bytes::{Bytes, BytesMut};
914    use serde::Serialize;
915
916    /// We use nanoseconds since epoch for timestamps.
917    ///
918    /// This will overflow in the year 2554. If that ever becomes a problem, we can
919    /// switch to u128.
920    ///
921    /// It will also not work for dates before the unix epoch, but that's not a
922    /// problem for our use case.
923    pub fn to_nanos(t: &SystemTime) -> u64 {
924        t.duration_since(SystemTime::UNIX_EPOCH)
925            .expect("system time before unix epoch")
926            .as_nanos()
927            .try_into()
928            .expect("u64 nanos")
929    }
930
931    pub fn current_timestamp() -> u64 {
932        to_nanos(&SystemTime::now())
933    }
934
935    pub fn from_nanos(nanos: u64) -> SystemTime {
936        SystemTime::UNIX_EPOCH + Duration::from_nanos(nanos)
937    }
938
939    pub(crate) fn postcard_ser<T: Serialize>(value: &T, buf: &mut BytesMut) -> Bytes {
940        buf.clear();
941        postcard::to_extend(value, ExtendBytesMut(buf)).expect("value to buf");
942        buf.split().into()
943    }
944
945    struct ExtendBytesMut<'a>(&'a mut BytesMut);
946
947    impl<'a> Extend<u8> for ExtendBytesMut<'a> {
948        fn extend<T: IntoIterator<Item = u8>>(&mut self, iter: T) {
949            for b in iter {
950                self.0.extend_from_slice(&[b]);
951            }
952        }
953    }
954
955    pub fn next_prefix(bytes: &mut [u8]) -> bool {
956        for byte in bytes.iter_mut().rev() {
957            if *byte < 255 {
958                *byte += 1;
959                return true;
960            }
961            *byte = 0;
962        }
963        false
964    }
965
966    pub fn format_bytes(bytes: &[u8]) -> String {
967        if bytes.is_empty() {
968            return "\"\"".to_string();
969        }
970        let Ok(s) = std::str::from_utf8(bytes) else {
971            return hex::encode(bytes);
972        };
973        if s.chars()
974            .any(|c| c.is_control() && c != '\n' && c != '\t' && c != '\r')
975        {
976            return hex::encode(bytes);
977        }
978        format!("\"{}\"", escape_string(s))
979    }
980
981    pub fn escape_string(s: &str) -> String {
982        s.chars()
983            .map(|c| match c {
984                '"' => "\\\"".to_string(),
985                '\\' => "\\\\".to_string(),
986                '\n' => "\\n".to_string(),
987                '\t' => "\\t".to_string(),
988                '\r' => "\\r".to_string(),
989                c => c.to_string(),
990            })
991            .collect()
992    }
993    pub(crate) struct DD<T: fmt::Display>(pub T);
994
995    impl<T: fmt::Display> fmt::Debug for DD<T> {
996        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
997            fmt::Display::fmt(&self.0, f)
998        }
999    }
1000}
1001
1002#[cfg(feature = "filter-parser")]
1003mod peg_parser {
1004    use std::{
1005        collections::HashSet,
1006        fmt::{self, Display},
1007        ops::Bound,
1008        str::FromStr,
1009        time::SystemTime,
1010    };
1011
1012    use bytes::Bytes;
1013    use iroh::PublicKey;
1014
1015    use crate::{
1016        Filter,
1017        util::{format_bytes, from_nanos},
1018    };
1019
1020    impl Display for Filter {
1021        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1022            let mut parts = Vec::new();
1023
1024            // Handle scope
1025            if let Some(scopes) = &self.scope
1026                && !scopes.is_empty()
1027            {
1028                let scope_list: Vec<String> = scopes.iter().map(|k| k.to_string()).collect();
1029                parts.push(format!("scope={{{}}}", scope_list.join(",")));
1030            }
1031
1032            // Handle key range
1033            match &self.key {
1034                (Bound::Unbounded, Bound::Unbounded) => {} // No key filter
1035                (Bound::Included(start), Bound::Included(end)) if start == end => {
1036                    // Single key
1037                    parts.push(format!("key={}", format_bytes(start)));
1038                }
1039                (start_bound, end_bound) => {
1040                    // Range
1041                    let start_str = match start_bound {
1042                        Bound::Unbounded => String::new(),
1043                        Bound::Included(bytes) | Bound::Excluded(bytes) => format_bytes(bytes),
1044                    };
1045
1046                    let end_str = match end_bound {
1047                        Bound::Unbounded => String::new(),
1048                        Bound::Included(bytes) | Bound::Excluded(bytes) => format_bytes(bytes),
1049                    };
1050
1051                    let op = match end_bound {
1052                        Bound::Included(_) => "..=",
1053                        _ => "..",
1054                    };
1055
1056                    parts.push(format!("key={start_str}{op}{end_str}"));
1057                }
1058            }
1059
1060            // Handle timestamp range
1061            match &self.timestamp {
1062                (Bound::Unbounded, Bound::Unbounded) => {} // No time filter
1063                (start_bound, end_bound) => {
1064                    let start_str = match start_bound {
1065                        Bound::Unbounded => String::new(),
1066                        Bound::Included(time) | Bound::Excluded(time) => {
1067                            use chrono::{DateTime, Utc};
1068                            let dt: DateTime<Utc> = (from_nanos(*time)).into();
1069                            dt.to_rfc3339()
1070                        }
1071                    };
1072
1073                    let end_str = match end_bound {
1074                        Bound::Unbounded => String::new(),
1075                        Bound::Included(time) | Bound::Excluded(time) => {
1076                            use chrono::{DateTime, Utc};
1077                            let dt: DateTime<Utc> = (from_nanos(*time)).into();
1078                            dt.to_rfc3339()
1079                        }
1080                    };
1081
1082                    let op = match end_bound {
1083                        Bound::Included(_) => "..=",
1084                        _ => "..",
1085                    };
1086
1087                    parts.push(format!("time={start_str}{op}{end_str}"));
1088                }
1089            }
1090
1091            write!(f, "{}", parts.join(" & "))
1092        }
1093    }
1094
1095    impl FromStr for Filter {
1096        type Err = String;
1097
1098        fn from_str(s: &str) -> Result<Self, Self::Err> {
1099            parse_filter(s)
1100        }
1101    }
1102
1103    peg::parser! {
1104        grammar filter_parser() for str {
1105            pub rule filter() -> Filter
1106                = _ parts:(filter_part() ** (_ "&" _)) _ {
1107                    let mut filter = Filter::ALL;
1108                    for part in parts {
1109                        match part {
1110                            FilterPart::Scope(keys) => filter = filter.scopes(keys),
1111                            FilterPart::Key(range) => filter = filter.keys(range),
1112                            FilterPart::KeyPrefix(prefix) => filter = filter.key_prefix(prefix),
1113                            FilterPart::Time(range) => filter = filter.timestamps(range),
1114                        }
1115                    }
1116                    filter
1117                }
1118
1119            rule filter_part() -> FilterPart
1120                = scope_part() / key_part() / time_part()
1121
1122            rule scope_part() -> FilterPart
1123                = "scope" _ "=" _ "{" _ keys:(hex_value() ** (_ "," _)) _ "}" {
1124                    let keys = keys.into_iter()
1125                        .filter_map(|s| PublicKey::from_str(&s).ok())
1126                        .collect();
1127                    FilterPart::Scope(keys)
1128                }
1129
1130            rule key_part() -> FilterPart
1131                = "key" _ "=" _ result:(key_prefix() / key_range() / key_single()) { result }
1132
1133            rule key_prefix() -> FilterPart
1134                = v:value() _ "*" { FilterPart::KeyPrefix(v) }
1135
1136            rule key_range() -> FilterPart
1137                = start:value()? _ op:range_op() _ end:value()? {
1138                    let start_bound = start.map_or(Bound::Unbounded, Bound::Included);
1139                    let end_bound = end.map_or(Bound::Unbounded, |v| {
1140                        if op { Bound::Included(v) } else { Bound::Excluded(v) }
1141                    });
1142                    FilterPart::Key((start_bound, end_bound))
1143                }
1144
1145            rule key_single() -> FilterPart
1146                = v:value() { FilterPart::Key((Bound::Included(v.clone()), Bound::Included(v))) }
1147
1148            rule time_part() -> FilterPart
1149                = "time" _ "=" _ range:time_range() { FilterPart::Time(range) }
1150
1151            rule time_range() -> (Bound<SystemTime>, Bound<SystemTime>)
1152                = start:timestamp()? _ op:range_op() _ end:timestamp()? {
1153                    let start_bound = start.flatten().map_or(Bound::Unbounded, Bound::Included);
1154                    let end_bound = end.flatten().map_or(Bound::Unbounded, |v| {
1155                        if op { Bound::Included(v) } else { Bound::Excluded(v) }
1156                    });
1157                    (start_bound, end_bound)
1158                }
1159
1160            rule range_op() -> bool
1161                = "..=" { true } / ".." { false }
1162
1163            rule value() -> Bytes
1164                = quoted_string() / hex_bytes()
1165
1166            rule quoted_string() -> Bytes
1167                = "\"" s:string_content() "\"" { Bytes::from(s) }
1168
1169            rule string_content() -> String
1170                = chars:string_char()* { chars.into_iter().collect() }
1171
1172            rule string_char() -> char
1173                = "\\\\" { '\\' }
1174                / "\\\"" { '"' }
1175                / "\\n" { '\n' }
1176                / "\\t" { '\t' }
1177                / "\\r" { '\r' }
1178                / c:$(!['\"' | '\\'] [_]) { c.chars().next().unwrap() }
1179
1180            rule hex_bytes() -> Bytes
1181                = s:$(['0'..='9' | 'a'..='f' | 'A'..='F']+) {?
1182                    hex::decode(s)
1183                        .map(Bytes::from)
1184                        .map_err(|_| "invalid hex")
1185                }
1186
1187            rule hex_value() -> String
1188                = s:$(['0'..='9' | 'a'..='f' | 'A'..='F']+) { s.to_string() }
1189
1190            rule timestamp() -> Option<SystemTime>
1191                = s:timestamp_chars() {
1192                    use chrono::{DateTime, Utc};
1193                    DateTime::parse_from_rfc3339(s.trim())
1194                        .map(|dt| dt.with_timezone(&Utc).into()).ok()
1195                }
1196
1197            rule timestamp_chars() -> &'input str
1198                = $([^ '&' | ' ' | '\t' | '\n' | '\r' | '.']+)
1199
1200            rule _() = [' ' | '\t' | '\n' | '\r']*
1201        }
1202    }
1203
1204    #[derive(Debug)]
1205    enum FilterPart {
1206        Scope(HashSet<PublicKey>),
1207        Key((Bound<Bytes>, Bound<Bytes>)),
1208        KeyPrefix(Bytes),
1209        Time((Bound<SystemTime>, Bound<SystemTime>)),
1210    }
1211
1212    pub fn parse_filter(input: &str) -> Result<Filter, String> {
1213        filter_parser::filter(input).map_err(|e| e.to_string())
1214    }
1215}