netbeam/sync/
subscription.rs1use crate::multiplex::{MemorySender, MultiplexedConn, MultiplexedConnKey, MultiplexedPacket};
41use crate::reliable_conn::ReliableOrderedStreamToTarget;
42use crate::sync::network_application::{
43 PostActionChannel, PostActionSync, PreActionChannel, PreActionSync,
44};
45use crate::sync::RelativeNodeType;
46use async_trait::async_trait;
47use bytes::Bytes;
48use citadel_io::tokio::sync::mpsc::UnboundedReceiver;
49use citadel_io::tokio::sync::Mutex;
50use citadel_io::RwLock;
51use std::collections::HashMap;
52
53#[async_trait]
54pub trait SubscriptionBiStream: Send + Sync {
55 type Conn: ReliableOrderedStreamToTarget + 'static;
56 type ID: MultiplexedConnKey;
57
58 fn conn(&self) -> &Self::Conn;
59 fn receiver(&self) -> &Mutex<UnboundedReceiver<Vec<u8>>>;
60 fn id(&self) -> Self::ID;
61 fn node_type(&self) -> RelativeNodeType;
62}
63
64#[async_trait]
65pub trait SubscriptionBiStreamExt: SubscriptionBiStream {
66 async fn multiplex<NewID: MultiplexedConnKey + 'static>(
69 self,
70 ) -> Result<MultiplexedConn<NewID>, anyhow::Error>
71 where
72 Self: Sized + 'static,
73 {
74 MultiplexedConn::register(self.node_type(), self).await
75 }
76}
77
78impl<T: SubscriptionBiStream> SubscriptionBiStreamExt for T {}
79
80#[async_trait::async_trait]
81pub trait Subscribable: Send + Sync + Sized {
82 type ID: MultiplexedConnKey;
83 type UnderlyingConn: ReliableOrderedStreamToTarget + 'static;
84 type SubscriptionType: SubscriptionBiStream;
85 type BorrowedSubscriptionType: SubscriptionBiStream<ID = Self::ID, Conn = Self::UnderlyingConn>
86 + Into<Self::SubscriptionType>;
87 fn underlying_conn(&self) -> &Self::UnderlyingConn;
90 fn subscriptions(&self) -> &RwLock<HashMap<Self::ID, MemorySender>>;
91 fn post_close_container(&self) -> &PostActionChannel<Self::ID>;
92 fn pre_action_container(&self) -> &PreActionChannel<Self::ID>;
93
94 async fn recv_post_close_signal_from_stream(&self, id: Self::ID) -> Result<(), anyhow::Error>;
95 async fn send_post_close_signal(&self, id: Self::ID) -> Result<(), anyhow::Error>;
96 async fn send_pre_open_signal(&self, id: Self::ID) -> Result<(), anyhow::Error>;
97
98 fn node_type(&self) -> RelativeNodeType;
99
100 fn initiate_subscription(&self) -> PreActionSync<'_, Self, Self::UnderlyingConn> {
101 PreActionSync::new(self)
102 }
103
104 fn get_next_prereserved(&self) -> Option<Self::BorrowedSubscriptionType>;
105 fn subscribe(&self, id: Self::ID) -> Self::BorrowedSubscriptionType;
106 fn owned_subscription(&self, id: Self::ID) -> Self::SubscriptionType;
107 fn get_next_id(&self) -> Self::ID;
108}
109
110#[async_trait]
111impl<R: SubscriptionBiStream + ?Sized> ReliableOrderedStreamToTarget for R {
112 async fn send_to_peer(&self, input: &[u8]) -> std::io::Result<()> {
113 let packet = MultiplexedPacket::ApplicationLayer {
114 id: self.id(),
115 payload: input.to_vec(),
116 };
117 self.conn()
118 .send_to_peer(&bincode::serialize(&packet).unwrap())
119 .await
120 }
121
122 async fn recv(&self) -> std::io::Result<Bytes> {
123 self.receiver()
124 .lock()
125 .await
126 .recv()
127 .await
128 .map(Bytes::from)
129 .ok_or_else(|| {
130 std::io::Error::new(std::io::ErrorKind::ConnectionReset, "Receiver died")
131 })
132 }
133}
134
135pub(crate) fn close_sequence_for_multiplexed_bistream<
136 S: Subscribable<ID = K> + 'static,
137 K: MultiplexedConnKey + 'static,
138>(
139 id: K,
140 ptr: S,
141) {
142 log::trace!(target: "citadel", "Running DROP on {:?}", id);
143
144 fn close<S: Subscribable<ID = K>, K: MultiplexedConnKey>(id: K, ptr: &S) {
145 let _ = ptr.subscriptions().write().remove(&id);
146 log::trace!(target: "citadel", "DROPPED id = {:?}", id);
147 }
148
149 if let Ok(rt) = citadel_io::tokio::runtime::Handle::try_current() {
151 rt.spawn(async move {
152 if let Err(err) = PostActionSync::new(&ptr, id).await {
153 log::warn!(target: "citadel", "[MetaActionSync/close] error: {:?}", err.to_string())
154 }
155
156 close(id, &ptr)
157 });
158 } else {
159 close(id, &ptr);
160 }
161}