hsipc/
subscription.rs

1//! RPC subscription system implementation
2//!
3//! This module provides the core types for the RPC subscription system,
4//! following the jsonrpsee pattern with PendingSubscriptionSink.
5
6use crate::{Error, Result};
7use serde::{Deserialize, Serialize};
8use std::marker::PhantomData;
9use tokio::sync::mpsc;
10use uuid::Uuid;
11
12/// Pending subscription sink that can be accepted or rejected
13///
14/// This follows the jsonrpsee pattern where subscription requests
15/// can be conditionally accepted or rejected with a reason.
16pub struct PendingSubscriptionSink {
17    id: Uuid,
18    method: String,
19    sender: Option<mpsc::UnboundedSender<serde_json::Value>>,
20}
21
22impl PendingSubscriptionSink {
23    /// Create a new pending subscription sink
24    pub fn new(id: Uuid, method: String, sender: mpsc::UnboundedSender<serde_json::Value>) -> Self {
25        Self {
26            id,
27            method,
28            sender: Some(sender),
29        }
30    }
31
32    /// Accept the subscription and return a SubscriptionSink
33    ///
34    /// This consumes the PendingSubscriptionSink and returns a SubscriptionSink
35    /// that can be used to send data to the subscriber.
36    pub async fn accept(mut self) -> Result<SubscriptionSink> {
37        let sender = self
38            .sender
39            .take()
40            .ok_or_else(|| Error::runtime_msg("Subscription already accepted or rejected"))?;
41
42        // TODO: Send accept message to client through ProcessHub
43        log::trace!(
44            "Subscription {} accepted for method {}",
45            self.id,
46            self.method
47        );
48
49        Ok(SubscriptionSink::new(self.id, sender, self.method))
50    }
51
52    /// Reject the subscription with a reason
53    ///
54    /// This will send a rejection message to the client and consume
55    /// the PendingSubscriptionSink.
56    pub async fn reject(self, reason: String) -> Result<()> {
57        // TODO: Send reject message to client through ProcessHub
58        log::trace!(
59            "Subscription {} rejected for method {}: {}",
60            self.id,
61            self.method,
62            reason
63        );
64
65        // Drop the sender to close the channel
66        drop(self.sender);
67
68        Err(Error::runtime_msg(format!(
69            "Subscription rejected: {reason}"
70        )))
71    }
72
73    /// Get the subscription ID
74    pub fn id(&self) -> Uuid {
75        self.id
76    }
77
78    /// Get the method name
79    pub fn method(&self) -> &str {
80        &self.method
81    }
82}
83
84/// Active subscription sink for sending data to subscribers
85///
86/// This is returned by PendingSubscriptionSink::accept() and provides
87/// methods to send JSON values to the subscribing client.
88pub struct SubscriptionSink {
89    id: Uuid,
90    sender: mpsc::UnboundedSender<serde_json::Value>,
91    method: String,
92}
93
94impl SubscriptionSink {
95    /// Create a new subscription sink
96    pub(crate) fn new(
97        id: Uuid,
98        sender: mpsc::UnboundedSender<serde_json::Value>,
99        method: String,
100    ) -> Self {
101        Self { id, sender, method }
102    }
103
104    /// Send a JSON value to the subscriber
105    ///
106    /// This is the low-level method for sending pre-serialized JSON.
107    pub async fn send(&self, value: serde_json::Value) -> Result<()> {
108        self.sender
109            .send(value)
110            .map_err(|_| Error::runtime_msg("Subscription channel closed"))?;
111        Ok(())
112    }
113
114    /// Send a serializable value to the subscriber
115    ///
116    /// This is a convenience method that serializes the value to JSON
117    /// before sending it.
118    pub async fn send_value<T: Serialize>(&self, value: T) -> Result<()> {
119        let json_value = serde_json::to_value(value)
120            .map_err(|e| Error::runtime_msg(format!("Failed to serialize value: {e}")))?;
121        self.send(json_value).await
122    }
123
124    /// Check if the subscription is still active
125    ///
126    /// Returns true if the subscriber is still connected and able to receive data.
127    pub fn is_closed(&self) -> bool {
128        self.sender.is_closed()
129    }
130
131    /// Get the subscription ID
132    pub fn id(&self) -> Uuid {
133        self.id
134    }
135
136    /// Get the method name
137    pub fn method(&self) -> &str {
138        &self.method
139    }
140}
141
142/// Client-side subscription handle for receiving data
143///
144/// This is the client-side counterpart to SubscriptionSink.
145/// It provides methods to receive deserialized data from the server.
146pub struct RpcSubscription<T> {
147    id: Uuid,
148    receiver: mpsc::UnboundedReceiver<serde_json::Value>,
149    _phantom: PhantomData<T>,
150}
151
152impl<T> RpcSubscription<T>
153where
154    T: for<'de> Deserialize<'de>,
155{
156    /// Create a new RPC subscription
157    pub fn new(id: Uuid, receiver: mpsc::UnboundedReceiver<serde_json::Value>) -> Self {
158        Self {
159            id,
160            receiver,
161            _phantom: PhantomData,
162        }
163    }
164
165    /// Receive the next value from the subscription
166    ///
167    /// Returns None if the subscription has been closed or canceled.
168    /// Returns Some(Err(_)) if there was an error deserializing the data.
169    pub async fn next(&mut self) -> Option<Result<T>> {
170        match self.receiver.recv().await {
171            Some(json_value) => match serde_json::from_value(json_value) {
172                Ok(value) => Some(Ok(value)),
173                Err(e) => Some(Err(Error::runtime_msg(format!(
174                    "Failed to deserialize subscription data: {e}"
175                )))),
176            },
177            None => None,
178        }
179    }
180
181    /// Cancel the subscription
182    ///
183    /// This will close the receiver and notify the server that the
184    /// subscription should be canceled.
185    pub async fn cancel(self) -> Result<()> {
186        // TODO: Send cancel message to server through ProcessHub
187        log::trace!("Subscription {} canceled", self.id);
188
189        // Closing the receiver will signal the server that we're done
190        drop(self.receiver);
191        Ok(())
192    }
193
194    /// Get the subscription ID
195    pub fn id(&self) -> Uuid {
196        self.id
197    }
198}
199
200/// Subscription message types for internal communication
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub enum SubscriptionMessage {
203    /// Request to create a new subscription
204    Request {
205        id: Uuid,
206        method: String,
207        params: serde_json::Value,
208    },
209    /// Subscription was accepted by the server
210    Accept { id: Uuid },
211    /// Subscription was rejected by the server
212    Reject { id: Uuid, reason: String },
213    /// Data sent from server to client
214    Data { id: Uuid, data: serde_json::Value },
215    /// Subscription was canceled
216    Cancel { id: Uuid },
217}
218
219impl SubscriptionMessage {
220    /// Get the subscription ID for this message
221    pub fn id(&self) -> Uuid {
222        match self {
223            SubscriptionMessage::Request { id, .. } => *id,
224            SubscriptionMessage::Accept { id } => *id,
225            SubscriptionMessage::Reject { id, .. } => *id,
226            SubscriptionMessage::Data { id, .. } => *id,
227            SubscriptionMessage::Cancel { id } => *id,
228        }
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use serde_json::json;
236
237    #[tokio::test]
238    async fn test_pending_subscription_accept() {
239        let (tx, mut rx) = mpsc::unbounded_channel();
240        let pending = PendingSubscriptionSink::new(Uuid::new_v4(), "test_method".to_string(), tx);
241
242        let sink = pending.accept().await.unwrap();
243        assert_eq!(sink.method(), "test_method");
244
245        // Test sending data
246        sink.send_value("test_data").await.unwrap();
247        let received = rx.recv().await.unwrap();
248        assert_eq!(received, json!("test_data"));
249    }
250
251    #[tokio::test]
252    async fn test_pending_subscription_reject() {
253        let (tx, _rx) = mpsc::unbounded_channel();
254        let pending = PendingSubscriptionSink::new(Uuid::new_v4(), "test_method".to_string(), tx);
255
256        let result = pending.reject("Invalid parameters".to_string()).await;
257        assert!(result.is_err());
258    }
259
260    #[tokio::test]
261    async fn test_rpc_subscription() {
262        let (tx, rx) = mpsc::unbounded_channel();
263        let mut subscription: RpcSubscription<String> = RpcSubscription::new(Uuid::new_v4(), rx);
264
265        // Send data through the channel
266        tx.send(json!("test_message")).unwrap();
267
268        // Receive and verify
269        let received = subscription.next().await.unwrap().unwrap();
270        assert_eq!(received, "test_message");
271    }
272
273    #[tokio::test]
274    async fn test_subscription_closed() {
275        let (tx, rx) = mpsc::unbounded_channel();
276        let mut subscription: RpcSubscription<String> = RpcSubscription::new(Uuid::new_v4(), rx);
277
278        // Drop sender to close the channel
279        drop(tx);
280
281        // Should return None when closed
282        let result = subscription.next().await;
283        assert!(result.is_none());
284    }
285}