1use std::{
19 collections::HashSet,
20 ops::{Bound, RangeBounds},
21 time::{Duration, SystemTime},
22};
23
24use bytes::{Bytes, BytesMut};
25use iroh::{NodeId, PublicKey, SecretKey};
26use iroh_gossip::api::{Event, GossipReceiver, GossipSender, GossipTopic};
27use irpc::{
28 channel::{mpsc, oneshot},
29 rpc_requests,
30};
31use n0_future::{FuturesUnordered, StreamExt, TryFutureExt, TryStreamExt, boxed::BoxFuture};
32pub use proto::SignedValue;
33use proto::{GossipMessage, SigningData};
34use rand::seq::SliceRandom;
35use serde::{Deserialize, Serialize};
36use snafu::Snafu;
37use sync_wrapper::SyncStream;
38use tokio::sync::broadcast;
39use tracing::{error, trace};
40use util::{current_timestamp, next_prefix, postcard_ser, to_nanos};
41
42type Entry = (PublicKey, Bytes, SignedValue);
43
44pub mod proto {
45 use std::time::SystemTime;
47
48 use bytes::Bytes;
49 use iroh::PublicKey;
50 use serde::{Deserialize, Serialize};
51 use serde_big_array::BigArray;
52
53 use crate::util::{DD, from_nanos};
54 #[derive(Clone, Serialize, Deserialize)]
55 pub struct SignedValue {
56 pub timestamp: u64,
58 pub value: Bytes,
60 #[serde(with = "BigArray")]
62 pub signature: [u8; 64],
63 }
64
65 impl std::fmt::Debug for SignedValue {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.debug_tuple("SignedValue")
68 .field(&self.timestamp)
69 .field(&self.value)
70 .field(&DD(hex::encode(self.signature)))
71 .finish()
72 }
73 }
74
75 impl SignedValue {
76 pub fn system_time(&self) -> SystemTime {
77 from_nanos(self.timestamp)
78 }
79 }
80
81 #[derive(Debug, Serialize, Deserialize)]
82 pub enum GossipMessage {
83 SignedValue(PublicKey, Bytes, SignedValue),
84 }
85
86 #[derive(Debug, Clone, Serialize, Deserialize)]
87 pub(crate) struct SigningData<'a> {
88 pub key: &'a [u8],
89 pub timestamp: u64,
90 pub value: &'a [u8],
91 }
92}
93
94#[derive(Debug, Snafu)]
95#[snafu(visibility(pub(crate)))]
96#[snafu(module)]
97pub enum InsertError {
98 #[snafu(transparent)]
99 Signature {
100 source: ed25519_dalek::SignatureError,
101 },
102 #[snafu(display("Value too old: existing timestamp {}, new timestamp {}", old, new))]
103 ValueTooOld { old: u64, new: u64 },
104}
105
106#[derive(Debug, Serialize, Deserialize)]
107struct Put {
108 pub scope: PublicKey,
109 pub key: Bytes,
110 pub value: SignedValue,
111}
112
113#[derive(Debug, Serialize, Deserialize)]
114struct Get {
115 pub scope: PublicKey,
116 pub key: Bytes,
117}
118
119#[derive(Debug, Serialize, Deserialize)]
120pub enum SubscribeMode {
121 Current,
123 Future,
125 Both,
127}
128
129#[derive(Debug, Serialize, Deserialize, Clone)]
130pub enum SubscribeItem {
131 Entry(Entry),
133 CurrentDone,
137 Expired((PublicKey, Bytes, u64)),
139}
140
141#[derive(Clone)]
142pub(crate) enum BroadcastItem {
143 Entry(Entry),
145 Expired((PublicKey, Bytes, u64)),
147}
148
149impl From<BroadcastItem> for SubscribeItem {
150 fn from(item: BroadcastItem) -> Self {
151 match item {
152 BroadcastItem::Entry(entry) => SubscribeItem::Entry(entry),
153 BroadcastItem::Expired(entry) => SubscribeItem::Expired(entry),
154 }
155 }
156}
157
158impl BroadcastItem {
159 fn contained_in(&self, filter: &Filter) -> bool {
160 match self {
161 BroadcastItem::Entry((scope, key, value)) => {
162 filter.contains(scope, key, value.timestamp)
163 }
164 BroadcastItem::Expired((scope, key, _)) => filter.contains_key(scope, key),
165 }
166 }
167}
168
169#[derive(Debug, Serialize, Deserialize)]
170pub struct Subscribe {
171 pub mode: SubscribeMode,
172 pub filter: Filter,
173}
174
175#[derive(Debug, Serialize, Deserialize)]
176pub(crate) struct JoinPeers {
177 pub peers: Vec<NodeId>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181pub(crate) struct Shutdown;
182
183#[derive(Debug, Serialize, Deserialize)]
184#[rpc_requests(message = Message)]
185enum Proto {
186 #[rpc(tx = oneshot::Sender<()>)]
187 Put(Put),
188 #[rpc(tx = oneshot::Sender<Option<SignedValue>>)]
189 Get(Get),
190 #[rpc(tx = mpsc::Sender<SubscribeItem>)]
191 Subscribe(Subscribe),
192 #[rpc(tx = oneshot::Sender<()>)]
193 JoinPeers(JoinPeers),
194 #[rpc(tx = oneshot::Sender<()>)]
195 Shutdown(Shutdown),
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct Filter {
200 pub scope: Option<HashSet<PublicKey>>,
202 pub key: (Bound<Bytes>, Bound<Bytes>),
204 pub timestamp: (Bound<u64>, Bound<u64>),
206}
207
208impl Filter {
209 pub const ALL: Self = Self {
210 scope: None,
211 key: (Bound::Unbounded, Bound::Unbounded),
212 timestamp: (Bound::Unbounded, Bound::Unbounded),
213 };
214 pub const EMPTY: Self = Self {
215 scope: None,
216 key: (Bound::Unbounded, Bound::Excluded(Bytes::new())),
217 timestamp: (Bound::Unbounded, Bound::Excluded(0)),
218 };
219 pub fn scope(self, scope: PublicKey) -> Self {
221 self.scopes(Some(scope))
222 }
223 pub fn scopes(self, scope: impl IntoIterator<Item = PublicKey>) -> Self {
225 let scope = scope.into_iter().collect();
226 Self {
227 scope: Some(scope),
228 key: self.key,
229 timestamp: self.timestamp,
230 }
231 }
232 pub fn key(self, key: impl Into<Bytes>) -> Self {
234 let key = key.into();
235 Self {
236 scope: self.scope,
237 key: (Bound::Included(key.clone()), Bound::Included(key)),
238 timestamp: self.timestamp,
239 }
240 }
241 pub fn keys<I, V>(self, range: I) -> Self
243 where
244 I: RangeBounds<V>,
245 V: Clone + Into<Bytes>,
246 {
247 let start = range.start_bound().map(|x| x.clone().into());
248 let end = range.end_bound().map(|x| x.clone().into());
249 Self {
250 scope: self.scope,
251 key: (start, end),
252 timestamp: self.timestamp,
253 }
254 }
255 pub fn key_prefix(self, prefix: impl Into<Bytes>) -> Self {
257 let prefix = prefix.into();
258 let mut end = prefix.to_vec();
259 let start = Bound::Included(prefix);
260 let end = if next_prefix(&mut end) {
261 Bound::Excluded(end.into())
262 } else {
263 Bound::Unbounded
264 };
265 Self {
266 scope: self.scope,
267 key: (start, end),
268 timestamp: self.timestamp,
269 }
270 }
271
272 pub fn timestamps(self, range: impl RangeBounds<SystemTime>) -> Self {
274 let start = range.start_bound().map(to_nanos);
275 let end = range.end_bound().map(to_nanos);
276 self.timestamps_nanos((start, end))
277 }
278
279 pub fn timestamps_nanos(self, range: impl RangeBounds<u64>) -> Self {
281 let start = range.start_bound().cloned();
282 let end = range.end_bound().cloned();
283 Self {
284 scope: self.scope,
285 key: self.key,
286 timestamp: (start, end),
287 }
288 }
289
290 pub fn contains(&self, scope: &PublicKey, key: &[u8], timestamp: u64) -> bool {
292 self.contains_key(scope, key) && self.timestamp.contains(×tamp)
293 }
294
295 pub fn contains_key(&self, scope: &PublicKey, key: &[u8]) -> bool {
297 if let Some(scopes) = &self.scope
298 && !scopes.contains(scope)
299 {
300 return false;
301 }
302 self.key.contains(key)
303 }
304}
305
306pub struct IterResult(BoxFuture<Result<mpsc::Receiver<SubscribeItem>, irpc::Error>>);
307
308impl IterResult {
309 pub async fn collect<C: Default + Extend<(PublicKey, Bytes, Bytes)>>(
310 self,
311 ) -> Result<C, irpc::Error> {
312 let mut rx = self.0.await?;
313 let mut items = C::default();
314 while let Some(SubscribeItem::Entry((scope, key, value))) = rx.recv().await? {
315 items.extend(Some((scope, key, value.value)));
316 }
317 Ok(items)
318 }
319}
320
321pub struct SubscribeResponse(BoxFuture<Result<mpsc::Receiver<SubscribeItem>, irpc::Error>>);
322
323impl SubscribeResponse {
324 pub fn stream_raw(
326 self,
327 ) -> impl n0_future::Stream<Item = Result<SubscribeItem, irpc::Error>> + Send + Sync + 'static
328 {
329 SyncStream::new(
330 async move {
331 let rx = self.0.await?;
332 Ok(rx.into_stream().map_err(irpc::Error::from))
333 }
334 .try_flatten_stream(),
335 )
336 }
337
338 pub fn stream(
340 self,
341 ) -> impl n0_future::Stream<Item = Result<Entry, irpc::Error>> + Send + Sync + 'static {
342 SyncStream::new(
343 async move {
344 let rx = self.0.await?;
345 Ok(rx
346 .into_stream()
347 .try_filter_map(|res| async move {
348 match res {
349 SubscribeItem::Entry(entry) => Ok(Some(entry)),
350 SubscribeItem::Expired((_, _, _)) => Ok(None),
351 SubscribeItem::CurrentDone => Ok(None),
352 }
353 })
354 .map_err(irpc::Error::from))
355 }
356 .try_flatten_stream(),
357 )
358 }
359}
360
361#[derive(Debug, Clone)]
362pub struct Client(irpc::Client<Proto>);
363
364#[derive(Debug, Clone)]
365pub struct WriteScope {
366 api: Client,
367 secret: SecretKey,
368 public: PublicKey,
369}
370
371impl WriteScope {
372 pub fn scope(&self) -> PublicKey {
373 self.public
374 }
375
376 pub async fn put(
377 &self,
378 key: impl Into<Bytes>,
379 value: impl Into<Bytes>,
380 ) -> Result<(), irpc::Error> {
381 let key = key.into();
382 let value = value.into();
383 let timestamp = current_timestamp();
384 let signing_data = SigningData {
385 key: &key,
386 timestamp,
387 value: &value,
388 };
389 let signing_data_bytes = postcard::to_stdvec(&signing_data).expect("signing data to vec");
390 let signature = self.secret.sign(&signing_data_bytes);
391 let signed_value = SignedValue {
392 timestamp,
393 value: value.clone(),
394 signature: signature.to_bytes(),
395 };
396 self.api.put(self.public, key, signed_value).await
397 }
398}
399
400impl Client {
401 pub fn local(topic: GossipTopic, config: Config) -> Self {
403 let (tx, rx) = tokio::sync::mpsc::channel(32);
404 let actor = Actor::new(topic, rx, config);
405 tokio::spawn(actor.run());
406 Self(tx.into())
407 }
408
409 async fn put(
411 &self,
412 scope: PublicKey,
413 key: Bytes,
414 value: SignedValue,
415 ) -> Result<(), irpc::Error> {
416 self.0.rpc(Put { scope, key, value }).await
417 }
418
419 pub fn write(&self, secret: SecretKey) -> WriteScope {
421 WriteScope {
422 api: self.clone(),
423 public: secret.public(),
424 secret,
425 }
426 }
427
428 pub async fn get(
429 &self,
430 scope: PublicKey,
431 key: impl Into<Bytes>,
432 ) -> Result<Option<Bytes>, irpc::Error> {
433 let value = self
434 .0
435 .rpc(Get {
436 scope,
437 key: key.into(),
438 })
439 .await?;
440 Ok(value.map(|sv| sv.value))
441 }
442
443 pub fn subscribe(&self) -> SubscribeResponse {
444 self.subscribe_with_opts(Subscribe {
445 mode: SubscribeMode::Both,
446 filter: Filter::ALL,
447 })
448 }
449
450 pub fn subscribe_with_opts(&self, subscribe: Subscribe) -> SubscribeResponse {
451 SubscribeResponse(Box::pin(self.0.server_streaming(subscribe, 32)))
452 }
453
454 pub fn iter_with_opts(&self, filter: Filter) -> IterResult {
455 let subscribe = Subscribe {
456 mode: SubscribeMode::Current,
457 filter,
458 };
459 IterResult(Box::pin(self.0.server_streaming(subscribe, 32)))
460 }
461
462 pub fn iter(&self) -> IterResult {
463 let subscribe = Subscribe {
464 mode: SubscribeMode::Current,
465 filter: Filter::ALL,
466 };
467 IterResult(Box::pin(self.0.server_streaming(subscribe, 32)))
468 }
469
470 pub fn join_peers(
471 &self,
472 peers: impl IntoIterator<Item = NodeId>,
473 ) -> impl n0_future::Future<Output = Result<(), irpc::Error>> {
474 let peers = JoinPeers {
475 peers: peers.into_iter().collect(),
476 };
477 self.0.rpc(peers)
478 }
479
480 pub async fn shutdown(&self) -> Result<(), irpc::Error> {
481 let _ = self.0.rpc(Shutdown).await?;
482 Ok(())
483 }
484}
485
486#[derive(Debug, Clone, Serialize, Deserialize, Default)]
487struct State {
488 current: rpds::HashTrieMapSync<PublicKey, rpds::RedBlackTreeMapSync<Bytes, SignedValue>>,
489}
490
491impl State {
492 fn new() -> Self {
493 Self::default()
494 }
495
496 fn snapshot(&self) -> Self {
497 self.clone()
498 }
499
500 fn insert_signed_value(
501 &mut self,
502 scope: PublicKey,
503 key: Bytes,
504 value: SignedValue,
505 ) -> Result<(), InsertError> {
506 let signing_data = SigningData {
507 key: &key,
508 timestamp: value.timestamp,
509 value: &value.value,
510 };
511 let signing_data_bytes = postcard::to_stdvec(&signing_data).expect("signing data to vec");
512 let signature = ed25519_dalek::Signature::from_bytes(&value.signature);
513 scope.verify(&signing_data_bytes, &signature)?;
514 self.insert_signed_value_unverified(scope, key, value)
515 }
516
517 fn insert_signed_value_unverified(
518 &mut self,
519 scope: PublicKey,
520 key: Bytes,
521 value: SignedValue,
522 ) -> Result<(), InsertError> {
523 let per_node = if let Some(current) = self.current.get_mut(&scope) {
524 current
525 } else {
526 self.current.insert_mut(scope, Default::default());
527 self.current.get_mut(&scope).expect("just inserted")
528 };
529 match per_node.get_mut(&key) {
530 Some(existing) if existing.timestamp >= value.timestamp => {
531 return Err(insert_error::ValueTooOldSnafu {
532 old: existing.timestamp,
533 new: value.timestamp,
534 }
535 .build());
536 }
537 _ => {
538 per_node.insert_mut(key, value);
539 }
540 }
541 Ok(())
542 }
543
544 fn get(&self, scope: &PublicKey, key: &Bytes) -> Option<&SignedValue> {
545 self.current.get(scope).and_then(|m| m.get(key))
546 }
547
548 fn flatten_filtered(
549 &self,
550 filter: &Filter,
551 ) -> impl Iterator<Item = (&PublicKey, &Bytes, &SignedValue)> {
552 let filtered_by_scope = self.current.iter().filter(|(scope, _)| {
554 filter
555 .scope
556 .as_ref()
557 .map(|scopes| scopes.contains(*scope))
558 .unwrap_or(true)
559 });
560 filtered_by_scope.flat_map(move |(scope, map)| {
561 let filtered_by_key = map
563 .range(filter.key.clone())
564 .filter(move |(_, signed_value)| {
565 filter.timestamp.contains(&signed_value.timestamp)
566 });
567 let filtered_by_timestamp = filtered_by_key.filter(move |(_, signed_value)| {
569 filter.timestamp.contains(&signed_value.timestamp)
570 });
571 filtered_by_timestamp.map(move |(key, signed_value)| (scope, key, signed_value))
573 })
574 }
575
576 fn flatten(&self) -> impl Iterator<Item = (&PublicKey, &Bytes, &SignedValue)> {
577 self.current.iter().flat_map(|(scope, map)| {
578 map.iter()
579 .map(move |(key, signed_value)| (scope, key, signed_value))
580 })
581 }
582}
583
584struct Actor {
585 sender: GossipSender,
586 receiver: GossipReceiver,
587 rx: tokio::sync::mpsc::Receiver<Message>,
588 config: Config,
589 state: State,
590 broadcast_tx: broadcast::Sender<BroadcastItem>,
591}
592
593impl Actor {
594 fn new(topic: GossipTopic, rx: tokio::sync::mpsc::Receiver<Message>, config: Config) -> Self {
595 let (sender, receiver) = topic.split();
596 let (broadcast_tx, _) = tokio::sync::broadcast::channel(32);
597 Self {
598 state: State::new(),
599 sender,
600 receiver,
601 rx,
602 broadcast_tx,
603 config,
604 }
605 }
606
607 fn horizon(&self) -> Option<u64> {
608 self.config
609 .expiry
610 .as_ref()
611 .map(|d| current_timestamp() - d.horizon.as_nanos() as u64)
612 }
613
614 fn apply_horizon(&mut self) -> Vec<(PublicKey, Bytes, u64)> {
615 let mut expired = Vec::new();
616 let Some(horizon) = self.horizon() else {
617 return expired;
618 };
619 for (scope, map) in self.state.current.iter() {
620 for (key, value) in map.iter() {
621 if value.timestamp < horizon {
622 expired.push((*scope, key.clone(), value.timestamp));
623 }
624 }
625 }
626 let mut expired_scopes = HashSet::new();
627 for (scope, key, _) in &expired {
628 let entry = self.state.current.get_mut(scope).expect("just checked");
629 entry.remove_mut(key);
630 if entry.is_empty() {
631 expired_scopes.insert(*scope);
632 }
633 }
634 for scope in expired_scopes {
635 self.state.current.remove_mut(&scope);
636 }
637 expired
638 }
639
640 async fn anti_entropy(
642 snapshot: State,
643 sender: GossipSender,
644 total: Duration,
645 ) -> Result<(), iroh_gossip::api::ApiError> {
646 trace!(
647 "Starting anti-entropy with {} items for {:?}",
648 snapshot.flatten().count(),
649 total
650 );
651 let mut to_publish = snapshot.flatten().collect::<Vec<_>>();
652 to_publish.shuffle(&mut rand::rng());
653 let n = to_publish.len();
654 if n == 0 {
655 tokio::time::sleep(total).await;
656 return Ok(());
657 }
658 let delay = total / (n as u32);
659 let mut buf = BytesMut::with_capacity(4096);
660 for (scope, key, signed_value) in to_publish {
661 let gossip_msg = GossipMessage::SignedValue(*scope, key.clone(), signed_value.clone());
662 let gossip_msg = postcard_ser(&gossip_msg, &mut buf);
663 trace!(
664 "Anti-entropy publishing key={:?} at={:?}",
665 key,
666 signed_value.timestamp / 1_000_000_000
667 );
668 sender.broadcast_neighbors(gossip_msg).await?;
669 tokio::time::sleep(delay).await;
670 }
671 Ok(())
672 }
673
674 async fn iter_current(
675 tx: &irpc::channel::mpsc::Sender<SubscribeItem>,
676 snapshot: &State,
677 filter: &Filter,
678 ) -> Result<(), irpc::Error> {
679 for (scope, key, signed_value) in snapshot.flatten_filtered(filter) {
680 tx.send(SubscribeItem::Entry((
681 *scope,
682 key.clone(),
683 signed_value.clone(),
684 )))
685 .await?;
686 }
687 Ok(())
688 }
689
690 async fn handle_subscribe(
691 tx: mpsc::Sender<SubscribeItem>,
692 filter: Filter,
693 current: Option<State>,
694 future: Option<tokio::sync::broadcast::Receiver<BroadcastItem>>,
695 ) {
696 if let Some(snapshot) = current
697 && Self::iter_current(&tx, &snapshot, &filter).await.is_err()
698 {
699 return;
700 }
701 let Some(mut broadcast_rx) = future else {
702 return;
703 };
704 if tx.send(SubscribeItem::CurrentDone).await.is_err() {
706 return;
707 }
708 loop {
709 tokio::select! {
710 item = broadcast_rx.recv() => {
711 let Ok(item) = item else {
712 break;
713 };
714 if !item.contained_in(&filter) {
715 continue;
716 }
717 if tx.send(item.into()).await.is_err() {
718 break;
719 }
720 }
721 _ = tx.closed() => {
722 break;
723 }
724 }
725 }
726 }
727
728 async fn run(mut self) {
729 let mut tasks = FuturesUnordered::<n0_future::boxed::BoxFuture<()>>::new();
730 let mut buf = bytes::BytesMut::with_capacity(4096);
731 let anti_entropy = Self::anti_entropy(
732 self.state.snapshot(),
733 self.sender.clone(),
734 self.config.anti_entropy_interval,
735 );
736 let horizon_period = match self.horizon() {
737 Some(_) => Duration::from_secs(30),
738 None => Duration::MAX,
739 };
740 let apply_horizon = Box::pin(tokio::time::sleep(horizon_period));
741 tokio::pin!(anti_entropy, apply_horizon);
742 loop {
743 tokio::select! {
744 msg = self.rx.recv() => {
745 trace!("Received local message {:?}", msg);
746 let Some(msg) = msg else {
747 break;
748 };
749 match msg {
750 Message::Put(msg) => {
751 self.state.insert_signed_value_unverified(msg.scope, msg.key.clone(), msg.value.clone())
752 .expect("inserting local value should always work");
753 let gossip_msg = GossipMessage::SignedValue(msg.scope, msg.key.clone(), msg.value.clone());
754 let gossip_msg = postcard_ser(&gossip_msg, &mut buf);
755 self.sender.broadcast(gossip_msg).await.ok();
756 self.broadcast_tx.send(BroadcastItem::Entry((msg.scope, msg.key.clone(), msg.value.clone()))).ok();
757 msg.tx.send(()).await.ok();
758 }
759 Message::Get(msg) => {
760 let res = self.state.get(&msg.scope, &msg.key);
761 msg.tx.send(res.cloned()).await.ok();
762 }
763 Message::Subscribe(msg) => {
764 let broadcast_rx = self.broadcast_tx.subscribe();
765 let filter = msg.filter.clone();
766 let (current, future) = match msg.mode {
767 SubscribeMode::Current => (Some(self.state.snapshot()), None),
768 SubscribeMode::Future => (None, Some(broadcast_rx)),
769 SubscribeMode::Both => (Some(self.state.snapshot()), Some(broadcast_rx)),
770 };
771 tasks.push(Box::pin(Self::handle_subscribe(msg.tx, filter, current, future)));
772 }
773 Message::JoinPeers(msg) => {
774 let res = self.sender.join_peers(msg.peers.clone()).await;
775 msg.tx.send(()).await.ok();
776 if let Err(e) = res {
777 error!("Error joining peers: {:?}", e);
778 break;
779 }
780 }
781 Message::Shutdown(msg) => {
782 msg.tx.send(()).await.ok();
783 break;
784 }
785 }
786 }
787 msg = self.receiver.next() => {
788 trace!("Received gossip message {:?}", msg);
789 let Some(msg) = msg else {
790 error!("Gossip receiver closed");
791 break;
792 };
793 let msg = match msg {
794 Ok(msg) => msg,
795 Err(cause) => {
796 error!("Error receiving message: {:?}", cause);
797 break;
798 }
799 };
800 let msg = match msg {
801 Event::Received(msg) => msg,
802 Event::NeighborUp(peer) => {
803 trace!("New peer {}, starting fast anti-entropy", peer.fmt_short());
804 anti_entropy.set(Self::anti_entropy(self.state.snapshot(), self.sender.clone(), self.config.fast_anti_entropy_interval));
805 continue;
806 },
807 Event::NeighborDown(peer) => {
808 trace!("Peer down: {}, goodbye!", peer.fmt_short());
809 continue;
810 },
811 e => {
812 trace!("Ignoring event: {:?}", e);
813 continue
814 },
815 };
816 let msg = match postcard::from_bytes::<GossipMessage>(&msg.content) {
817 Ok(msg) => msg,
818 Err(e) => {
819 trace!("Error deserializing gossip message: {:?}", e);
820 continue;
821 }
822 };
823 match msg {
824 GossipMessage::SignedValue(scope, key, value) => {
825 if let Some(horizon) = self.horizon()
826 && value.timestamp < horizon {
827 trace!("Ignoring value key={:?} epoch={} below horizon", key, value.timestamp);
828 continue;
829 }
830 let id = scope.fmt_short();
831 trace!(%id, "Received signed value key={:?} epoch={}", key, value.timestamp);
832 let Ok(_) = self.state.insert_signed_value(scope, key.clone(), value.clone()) else {
833 continue;
834 };
835 trace!(%id, "Broadcasting internally");
836 self.broadcast_tx.send(BroadcastItem::Entry((scope, key, value))).ok();
837 }
838 }
839 }
840 res = &mut anti_entropy => {
841 if let Err(e) = res {
842 error!("Error in anti-entropy: {:?}", e);
843 break;
844 }
845 anti_entropy.set(Self::anti_entropy(self.state.snapshot(), self.sender.clone(), self.config.anti_entropy_interval));
847 }
848 _ = &mut apply_horizon => {
849 let expired = self.apply_horizon();
850 for entry in expired {
851 self.broadcast_tx.send(BroadcastItem::Expired(entry)).ok();
852 }
853 apply_horizon.set(Box::pin(tokio::time::sleep(horizon_period)));
855 }
856 _ = tasks.next(), if !tasks.is_empty() => {}
857 }
858 }
859 }
860}
861
862#[derive(Debug, Clone)]
863pub struct Config {
864 pub anti_entropy_interval: Duration,
865 pub fast_anti_entropy_interval: Duration,
866 pub expiry: Option<ExpiryConfig>,
869}
870
871#[derive(Debug, Clone)]
872pub struct ExpiryConfig {
873 pub horizon: Duration,
875 pub check_interval: Duration,
877}
878
879impl Config {
880 pub const DEBUG: Self = Self {
881 anti_entropy_interval: Duration::from_secs(30),
882 fast_anti_entropy_interval: Duration::from_secs(5),
883 expiry: Some(ExpiryConfig {
884 horizon: Duration::from_secs(30),
885 check_interval: Duration::from_secs(10),
886 }),
887 };
888}
889
890impl Default for Config {
891 fn default() -> Self {
892 Self {
893 anti_entropy_interval: Duration::from_secs(300),
895 fast_anti_entropy_interval: Duration::from_secs(10),
897 expiry: Some(ExpiryConfig {
899 horizon: Duration::from_secs(3600 * 24),
900 check_interval: Duration::from_secs(3600),
901 }),
902 }
903 }
904}
905
906pub mod util {
907 use std::{
909 fmt,
910 time::{Duration, SystemTime},
911 };
912
913 use bytes::{Bytes, BytesMut};
914 use serde::Serialize;
915
916 pub fn to_nanos(t: &SystemTime) -> u64 {
924 t.duration_since(SystemTime::UNIX_EPOCH)
925 .expect("system time before unix epoch")
926 .as_nanos()
927 .try_into()
928 .expect("u64 nanos")
929 }
930
931 pub fn current_timestamp() -> u64 {
932 to_nanos(&SystemTime::now())
933 }
934
935 pub fn from_nanos(nanos: u64) -> SystemTime {
936 SystemTime::UNIX_EPOCH + Duration::from_nanos(nanos)
937 }
938
939 pub(crate) fn postcard_ser<T: Serialize>(value: &T, buf: &mut BytesMut) -> Bytes {
940 buf.clear();
941 postcard::to_extend(value, ExtendBytesMut(buf)).expect("value to buf");
942 buf.split().into()
943 }
944
945 struct ExtendBytesMut<'a>(&'a mut BytesMut);
946
947 impl<'a> Extend<u8> for ExtendBytesMut<'a> {
948 fn extend<T: IntoIterator<Item = u8>>(&mut self, iter: T) {
949 for b in iter {
950 self.0.extend_from_slice(&[b]);
951 }
952 }
953 }
954
955 pub fn next_prefix(bytes: &mut [u8]) -> bool {
956 for byte in bytes.iter_mut().rev() {
957 if *byte < 255 {
958 *byte += 1;
959 return true;
960 }
961 *byte = 0;
962 }
963 false
964 }
965
966 pub fn format_bytes(bytes: &[u8]) -> String {
967 if bytes.is_empty() {
968 return "\"\"".to_string();
969 }
970 let Ok(s) = std::str::from_utf8(bytes) else {
971 return hex::encode(bytes);
972 };
973 if s.chars()
974 .any(|c| c.is_control() && c != '\n' && c != '\t' && c != '\r')
975 {
976 return hex::encode(bytes);
977 }
978 format!("\"{}\"", escape_string(s))
979 }
980
981 pub fn escape_string(s: &str) -> String {
982 s.chars()
983 .map(|c| match c {
984 '"' => "\\\"".to_string(),
985 '\\' => "\\\\".to_string(),
986 '\n' => "\\n".to_string(),
987 '\t' => "\\t".to_string(),
988 '\r' => "\\r".to_string(),
989 c => c.to_string(),
990 })
991 .collect()
992 }
993 pub(crate) struct DD<T: fmt::Display>(pub T);
994
995 impl<T: fmt::Display> fmt::Debug for DD<T> {
996 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
997 fmt::Display::fmt(&self.0, f)
998 }
999 }
1000}
1001
1002#[cfg(feature = "filter-parser")]
1003mod peg_parser {
1004 use std::{
1005 collections::HashSet,
1006 fmt::{self, Display},
1007 ops::Bound,
1008 str::FromStr,
1009 time::SystemTime,
1010 };
1011
1012 use bytes::Bytes;
1013 use iroh::PublicKey;
1014
1015 use crate::{
1016 Filter,
1017 util::{format_bytes, from_nanos},
1018 };
1019
1020 impl Display for Filter {
1021 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1022 let mut parts = Vec::new();
1023
1024 if let Some(scopes) = &self.scope
1026 && !scopes.is_empty()
1027 {
1028 let scope_list: Vec<String> = scopes.iter().map(|k| k.to_string()).collect();
1029 parts.push(format!("scope={{{}}}", scope_list.join(",")));
1030 }
1031
1032 match &self.key {
1034 (Bound::Unbounded, Bound::Unbounded) => {} (Bound::Included(start), Bound::Included(end)) if start == end => {
1036 parts.push(format!("key={}", format_bytes(start)));
1038 }
1039 (start_bound, end_bound) => {
1040 let start_str = match start_bound {
1042 Bound::Unbounded => String::new(),
1043 Bound::Included(bytes) | Bound::Excluded(bytes) => format_bytes(bytes),
1044 };
1045
1046 let end_str = match end_bound {
1047 Bound::Unbounded => String::new(),
1048 Bound::Included(bytes) | Bound::Excluded(bytes) => format_bytes(bytes),
1049 };
1050
1051 let op = match end_bound {
1052 Bound::Included(_) => "..=",
1053 _ => "..",
1054 };
1055
1056 parts.push(format!("key={start_str}{op}{end_str}"));
1057 }
1058 }
1059
1060 match &self.timestamp {
1062 (Bound::Unbounded, Bound::Unbounded) => {} (start_bound, end_bound) => {
1064 let start_str = match start_bound {
1065 Bound::Unbounded => String::new(),
1066 Bound::Included(time) | Bound::Excluded(time) => {
1067 use chrono::{DateTime, Utc};
1068 let dt: DateTime<Utc> = (from_nanos(*time)).into();
1069 dt.to_rfc3339()
1070 }
1071 };
1072
1073 let end_str = match end_bound {
1074 Bound::Unbounded => String::new(),
1075 Bound::Included(time) | Bound::Excluded(time) => {
1076 use chrono::{DateTime, Utc};
1077 let dt: DateTime<Utc> = (from_nanos(*time)).into();
1078 dt.to_rfc3339()
1079 }
1080 };
1081
1082 let op = match end_bound {
1083 Bound::Included(_) => "..=",
1084 _ => "..",
1085 };
1086
1087 parts.push(format!("time={start_str}{op}{end_str}"));
1088 }
1089 }
1090
1091 write!(f, "{}", parts.join(" & "))
1092 }
1093 }
1094
1095 impl FromStr for Filter {
1096 type Err = String;
1097
1098 fn from_str(s: &str) -> Result<Self, Self::Err> {
1099 parse_filter(s)
1100 }
1101 }
1102
1103 peg::parser! {
1104 grammar filter_parser() for str {
1105 pub rule filter() -> Filter
1106 = _ parts:(filter_part() ** (_ "&" _)) _ {
1107 let mut filter = Filter::ALL;
1108 for part in parts {
1109 match part {
1110 FilterPart::Scope(keys) => filter = filter.scopes(keys),
1111 FilterPart::Key(range) => filter = filter.keys(range),
1112 FilterPart::KeyPrefix(prefix) => filter = filter.key_prefix(prefix),
1113 FilterPart::Time(range) => filter = filter.timestamps(range),
1114 }
1115 }
1116 filter
1117 }
1118
1119 rule filter_part() -> FilterPart
1120 = scope_part() / key_part() / time_part()
1121
1122 rule scope_part() -> FilterPart
1123 = "scope" _ "=" _ "{" _ keys:(hex_value() ** (_ "," _)) _ "}" {
1124 let keys = keys.into_iter()
1125 .filter_map(|s| PublicKey::from_str(&s).ok())
1126 .collect();
1127 FilterPart::Scope(keys)
1128 }
1129
1130 rule key_part() -> FilterPart
1131 = "key" _ "=" _ result:(key_prefix() / key_range() / key_single()) { result }
1132
1133 rule key_prefix() -> FilterPart
1134 = v:value() _ "*" { FilterPart::KeyPrefix(v) }
1135
1136 rule key_range() -> FilterPart
1137 = start:value()? _ op:range_op() _ end:value()? {
1138 let start_bound = start.map_or(Bound::Unbounded, Bound::Included);
1139 let end_bound = end.map_or(Bound::Unbounded, |v| {
1140 if op { Bound::Included(v) } else { Bound::Excluded(v) }
1141 });
1142 FilterPart::Key((start_bound, end_bound))
1143 }
1144
1145 rule key_single() -> FilterPart
1146 = v:value() { FilterPart::Key((Bound::Included(v.clone()), Bound::Included(v))) }
1147
1148 rule time_part() -> FilterPart
1149 = "time" _ "=" _ range:time_range() { FilterPart::Time(range) }
1150
1151 rule time_range() -> (Bound<SystemTime>, Bound<SystemTime>)
1152 = start:timestamp()? _ op:range_op() _ end:timestamp()? {
1153 let start_bound = start.flatten().map_or(Bound::Unbounded, Bound::Included);
1154 let end_bound = end.flatten().map_or(Bound::Unbounded, |v| {
1155 if op { Bound::Included(v) } else { Bound::Excluded(v) }
1156 });
1157 (start_bound, end_bound)
1158 }
1159
1160 rule range_op() -> bool
1161 = "..=" { true } / ".." { false }
1162
1163 rule value() -> Bytes
1164 = quoted_string() / hex_bytes()
1165
1166 rule quoted_string() -> Bytes
1167 = "\"" s:string_content() "\"" { Bytes::from(s) }
1168
1169 rule string_content() -> String
1170 = chars:string_char()* { chars.into_iter().collect() }
1171
1172 rule string_char() -> char
1173 = "\\\\" { '\\' }
1174 / "\\\"" { '"' }
1175 / "\\n" { '\n' }
1176 / "\\t" { '\t' }
1177 / "\\r" { '\r' }
1178 / c:$(!['\"' | '\\'] [_]) { c.chars().next().unwrap() }
1179
1180 rule hex_bytes() -> Bytes
1181 = s:$(['0'..='9' | 'a'..='f' | 'A'..='F']+) {?
1182 hex::decode(s)
1183 .map(Bytes::from)
1184 .map_err(|_| "invalid hex")
1185 }
1186
1187 rule hex_value() -> String
1188 = s:$(['0'..='9' | 'a'..='f' | 'A'..='F']+) { s.to_string() }
1189
1190 rule timestamp() -> Option<SystemTime>
1191 = s:timestamp_chars() {
1192 use chrono::{DateTime, Utc};
1193 DateTime::parse_from_rfc3339(s.trim())
1194 .map(|dt| dt.with_timezone(&Utc).into()).ok()
1195 }
1196
1197 rule timestamp_chars() -> &'input str
1198 = $([^ '&' | ' ' | '\t' | '\n' | '\r' | '.']+)
1199
1200 rule _() = [' ' | '\t' | '\n' | '\r']*
1201 }
1202 }
1203
1204 #[derive(Debug)]
1205 enum FilterPart {
1206 Scope(HashSet<PublicKey>),
1207 Key((Bound<Bytes>, Bound<Bytes>)),
1208 KeyPrefix(Bytes),
1209 Time((Bound<SystemTime>, Bound<SystemTime>)),
1210 }
1211
1212 pub fn parse_filter(input: &str) -> Result<Filter, String> {
1213 filter_parser::filter(input).map_err(|e| e.to_string())
1214 }
1215}