Skip to main content

cdk_common/pub_sub/
mod.rs

1//! Publish/Subscribe core
2//!
3//! This module defines the transport-agnostic pub/sub primitives used by both
4//! mint and wallet components. The design prioritizes:
5//!
6//! - **Request coalescing**: multiple local subscribers to the same remote topic
7//!   result in a single upstream subscription, with local fan‑out.
8//! - **Latest-on-subscribe** (NUT-17): on (re)subscription, the most recent event
9//!   is fetched and delivered before streaming new ones.
10//! - **Backpressure-aware delivery**: bounded channels + drop policies prevent
11//!   a slow consumer from stalling the whole pipeline.
12//! - **Resilience**: automatic reconnect with exponential backoff; WebSocket
13//!   streaming when available, HTTP long-poll fallback otherwise.
14//!
15//! Terms used throughout the module:
16//! - **Event**: a domain object that maps to one or more `Topic`s via `Event::get_topics`.
17//! - **Topic**: an index/type that defines storage and matching semantics.
18//! - **SubscriptionRequest**: a domain-specific filter that can be converted into
19//!   low-level transport messages (e.g., WebSocket subscribe frames).
20//! - **Spec**: type bundle tying `Event`, `Topic`, `SubscriptionId`, and serialization.
21
22mod error;
23mod pubsub;
24pub mod remote_consumer;
25mod subscriber;
26mod types;
27
28pub use self::error::Error;
29pub use self::pubsub::Pubsub;
30pub use self::subscriber::{Subscriber, SubscriptionRequest};
31pub use self::types::*;
32
33#[cfg(test)]
34mod test {
35    use std::collections::HashMap;
36    use std::sync::{Arc, RwLock};
37
38    use serde::{Deserialize, Serialize};
39
40    use super::subscriber::SubscriptionRequest;
41    use super::{Error, Event, Pubsub, Spec, Subscriber};
42
43    #[derive(Clone, Debug, Serialize, Eq, PartialEq, Deserialize)]
44    pub struct Message {
45        pub foo: u64,
46        pub bar: u64,
47    }
48
49    #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
50    pub enum IndexTest {
51        Foo(u64),
52        Bar(u64),
53    }
54
55    impl Event for Message {
56        type Topic = IndexTest;
57
58        fn get_topics(&self) -> Vec<Self::Topic> {
59            vec![IndexTest::Foo(self.foo), IndexTest::Bar(self.bar)]
60        }
61    }
62
63    pub struct CustomPubSub {
64        pub storage: Arc<RwLock<HashMap<IndexTest, Message>>>,
65    }
66
67    #[async_trait::async_trait]
68    impl Spec for CustomPubSub {
69        type Topic = IndexTest;
70
71        type Event = Message;
72
73        type SubscriptionId = String;
74
75        type Context = ();
76
77        fn new_instance(_context: Self::Context) -> Arc<Self>
78        where
79            Self: Sized,
80        {
81            Arc::new(Self {
82                storage: Default::default(),
83            })
84        }
85
86        async fn fetch_events(
87            self: &Arc<Self>,
88            topics: Vec<<Self::Event as Event>::Topic>,
89            reply_to: Subscriber<Self>,
90        ) where
91            Self: Sized,
92        {
93            let storage = self.storage.read().unwrap();
94
95            for index in topics {
96                if let Some(value) = storage.get(&index) {
97                    let _ = reply_to.send(value.clone());
98                }
99            }
100        }
101    }
102
103    #[derive(Debug, Clone)]
104    pub enum SubscriptionReq {
105        Foo(u64),
106        Bar(u64),
107    }
108
109    impl SubscriptionRequest for SubscriptionReq {
110        type Topic = IndexTest;
111
112        type SubscriptionId = String;
113
114        fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
115            Ok(vec![match self {
116                SubscriptionReq::Bar(n) => IndexTest::Bar(*n),
117                SubscriptionReq::Foo(n) => IndexTest::Foo(*n),
118            }])
119        }
120
121        fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
122            Arc::new("test".to_owned())
123        }
124    }
125
126    #[derive(Debug, Clone)]
127    pub struct FailingSubscriptionReq;
128
129    impl SubscriptionRequest for FailingSubscriptionReq {
130        type Topic = IndexTest;
131
132        type SubscriptionId = String;
133
134        fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
135            Err(Error::ParsingError("intentional failure".to_string()))
136        }
137
138        fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
139            Arc::new("failing-sub".to_owned())
140        }
141    }
142
143    #[tokio::test]
144    async fn delivery_twice_realtime() {
145        let pubsub = Pubsub::new(CustomPubSub::new_instance(()));
146
147        assert_eq!(pubsub.active_subscribers(), 0);
148
149        let mut subscriber = pubsub.subscribe(SubscriptionReq::Foo(2)).unwrap();
150
151        assert_eq!(pubsub.active_subscribers(), 1);
152
153        let _ = pubsub.publish_now(Message { foo: 2, bar: 1 });
154        let _ = pubsub.publish_now(Message { foo: 2, bar: 2 });
155
156        assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(1));
157        assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(2));
158        assert!(subscriber.try_recv().is_none());
159
160        drop(subscriber);
161
162        assert_eq!(pubsub.active_subscribers(), 0);
163    }
164
165    #[tokio::test]
166    async fn failed_subscribe_does_not_leak_active_subscribers() {
167        let pubsub = Pubsub::new(CustomPubSub::new_instance(()));
168
169        assert_eq!(pubsub.active_subscribers(), 0);
170
171        let result = pubsub.subscribe(FailingSubscriptionReq);
172
173        assert!(result.is_err());
174        assert_eq!(pubsub.active_subscribers(), 0);
175    }
176
177    #[tokio::test]
178    async fn read_from_storage() {
179        let x = CustomPubSub::new_instance(());
180        let storage = x.storage.clone();
181
182        let pubsub = Pubsub::new(x);
183
184        {
185            // set previous value
186            let mut s = storage.write().unwrap();
187            s.insert(IndexTest::Bar(2), Message { foo: 3, bar: 2 });
188        }
189
190        let mut subscriber = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
191
192        // Just should receive the latest
193        assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(3));
194
195        // realtime delivery test
196        let _ = pubsub.publish_now(Message { foo: 1, bar: 2 });
197        assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(1));
198
199        {
200            // set previous value
201            let mut s = storage.write().unwrap();
202            s.insert(IndexTest::Bar(2), Message { foo: 1, bar: 2 });
203        }
204
205        // new subscription should only get the latest state (it is up to the Topic trait)
206        let mut y = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
207        assert_eq!(y.recv().await.map(|x| x.foo), Some(1));
208    }
209}