netbeam/sync/
subscription.rs

1//! # Subscription Stream Implementation
2//!
3//! Provides bidirectional subscription-based streaming capabilities for multiplexed
4//! network connections, enabling reliable ordered communication between nodes.
5//!
6//! ## Features
7//!
8//! - **Bidirectional Streaming**:
9//!   - Reliable ordered message delivery
10//!   - Automatic stream management
11//!   - Multiplexed connections
12//!   - Stream identification
13//!
14//! - **Subscription Management**:
15//!   - Stream subscription handling
16//!   - Connection lifecycle management
17//!   - Node type awareness
18//!   - Resource cleanup
19//!
20//! - **Multiplexing Support**:
21//!   - Stream multiplexing
22//!   - Connection sharing
23//!   - Stream isolation
24//!   - Connection pooling
25//!
26//! ## Important Notes
27//!
28//! - Implements async/await patterns
29//! - Ensures thread-safety
30//! - Handles connection cleanup
31//! - Maintains message ordering
32//!
33//! ## Related Components
34//!
35//! - [`MultiplexedConn`]: Connection multiplexing
36//! - [`ReliableOrderedStreamToTarget`]: Stream reliability
37//! - [`network_application`]: Application integration
38//! - [`RelativeNodeType`]: Node type management
39
40use crate::multiplex::{MemorySender, MultiplexedConn, MultiplexedConnKey, MultiplexedPacket};
41use crate::reliable_conn::ReliableOrderedStreamToTarget;
42use crate::sync::network_application::{
43    PostActionChannel, PostActionSync, PreActionChannel, PreActionSync,
44};
45use crate::sync::RelativeNodeType;
46use async_trait::async_trait;
47use bytes::Bytes;
48use citadel_io::tokio::sync::mpsc::UnboundedReceiver;
49use citadel_io::tokio::sync::Mutex;
50use citadel_io::RwLock;
51use std::collections::HashMap;
52
53#[async_trait]
54pub trait SubscriptionBiStream: Send + Sync {
55    type Conn: ReliableOrderedStreamToTarget + 'static;
56    type ID: MultiplexedConnKey;
57
58    fn conn(&self) -> &Self::Conn;
59    fn receiver(&self) -> &Mutex<UnboundedReceiver<Vec<u8>>>;
60    fn id(&self) -> Self::ID;
61    fn node_type(&self) -> RelativeNodeType;
62}
63
64#[async_trait]
65pub trait SubscriptionBiStreamExt: SubscriptionBiStream {
66    /// Creates a new multiplexed level capable of obtaining more subscribers.
67    /// Uses Self as a reliable ordered connection, while using NewId to identify the substreams in the created next level
68    async fn multiplex<NewID: MultiplexedConnKey + 'static>(
69        self,
70    ) -> Result<MultiplexedConn<NewID>, anyhow::Error>
71    where
72        Self: Sized + 'static,
73    {
74        MultiplexedConn::register(self.node_type(), self).await
75    }
76}
77
78impl<T: SubscriptionBiStream> SubscriptionBiStreamExt for T {}
79
80#[async_trait::async_trait]
81pub trait Subscribable: Send + Sync + Sized {
82    type ID: MultiplexedConnKey;
83    type UnderlyingConn: ReliableOrderedStreamToTarget + 'static;
84    type SubscriptionType: SubscriptionBiStream;
85    type BorrowedSubscriptionType: SubscriptionBiStream<ID = Self::ID, Conn = Self::UnderlyingConn>
86        + Into<Self::SubscriptionType>;
87    // TODO on stabalization of GATs: type BorrowedSubscriptionType<'a>: SubscriptionBiStream<ID=Self::ID, Conn=Self::UnderlyingConn> + Into<Self::SubscriptionType>;
88
89    fn underlying_conn(&self) -> &Self::UnderlyingConn;
90    fn subscriptions(&self) -> &RwLock<HashMap<Self::ID, MemorySender>>;
91    fn post_close_container(&self) -> &PostActionChannel<Self::ID>;
92    fn pre_action_container(&self) -> &PreActionChannel<Self::ID>;
93
94    async fn recv_post_close_signal_from_stream(&self, id: Self::ID) -> Result<(), anyhow::Error>;
95    async fn send_post_close_signal(&self, id: Self::ID) -> Result<(), anyhow::Error>;
96    async fn send_pre_open_signal(&self, id: Self::ID) -> Result<(), anyhow::Error>;
97
98    fn node_type(&self) -> RelativeNodeType;
99
100    fn initiate_subscription(&self) -> PreActionSync<'_, Self, Self::UnderlyingConn> {
101        PreActionSync::new(self)
102    }
103
104    fn get_next_prereserved(&self) -> Option<Self::BorrowedSubscriptionType>;
105    fn subscribe(&self, id: Self::ID) -> Self::BorrowedSubscriptionType;
106    fn owned_subscription(&self, id: Self::ID) -> Self::SubscriptionType;
107    fn get_next_id(&self) -> Self::ID;
108}
109
110#[async_trait]
111impl<R: SubscriptionBiStream + ?Sized> ReliableOrderedStreamToTarget for R {
112    async fn send_to_peer(&self, input: &[u8]) -> std::io::Result<()> {
113        let packet = MultiplexedPacket::ApplicationLayer {
114            id: self.id(),
115            payload: input.to_vec(),
116        };
117        self.conn()
118            .send_to_peer(&bincode::serialize(&packet).unwrap())
119            .await
120    }
121
122    async fn recv(&self) -> std::io::Result<Bytes> {
123        self.receiver()
124            .lock()
125            .await
126            .recv()
127            .await
128            .map(Bytes::from)
129            .ok_or_else(|| {
130                std::io::Error::new(std::io::ErrorKind::ConnectionReset, "Receiver died")
131            })
132    }
133}
134
135pub(crate) fn close_sequence_for_multiplexed_bistream<
136    S: Subscribable<ID = K> + 'static,
137    K: MultiplexedConnKey + 'static,
138>(
139    id: K,
140    ptr: S,
141) {
142    log::trace!(target: "citadel", "Running DROP on {:?}", id);
143
144    fn close<S: Subscribable<ID = K>, K: MultiplexedConnKey>(id: K, ptr: &S) {
145        let _ = ptr.subscriptions().write().remove(&id);
146        log::trace!(target: "citadel", "DROPPED id = {:?}", id);
147    }
148
149    // the runtime may not exist while dropping
150    if let Ok(rt) = citadel_io::tokio::runtime::Handle::try_current() {
151        rt.spawn(async move {
152            if let Err(err) = PostActionSync::new(&ptr, id).await {
153                log::warn!(target: "citadel", "[MetaActionSync/close] error: {:?}", err.to_string())
154            }
155
156            close(id, &ptr)
157        });
158    } else {
159        close(id, &ptr);
160    }
161}