1use std::{
2 collections::HashMap,
3 io::{Error as IoError, ErrorKind},
4 marker::PhantomData,
5 str::FromStr,
6 sync::{Arc, PoisonError, RwLock, RwLockReadGuard},
7 time::Duration,
8};
9
10use camino::Utf8PathBuf;
11use log::{debug, error, info};
12use tokio::{
13 runtime::{self},
14 sync::{
15 mpsc::{self, Receiver, Sender},
16 oneshot,
17 },
18 task::JoinHandle,
19};
20
21use super::{transport::PDUTransport, Daemon};
22use crate::{
23 daemon::{EntityConfig, Indication, PutRequest, Report, UserPrimitive},
24 filestore::{FileStore, NativeFileStore},
25 pdu::{EntityID, TransactionSeqNum},
26 transaction::TransactionID,
27};
28
29#[derive(Debug)]
30pub struct JoD<'a, T> {
31 pub handle: Vec<JoinHandle<T>>,
32 phantom: PhantomData<&'a ()>,
33}
34#[allow(clippy::needless_lifetimes)]
35impl<'a, T> From<JoinHandle<T>> for JoD<'a, T> {
36 fn from(input: JoinHandle<T>) -> Self {
37 Self {
38 handle: vec![input],
39 phantom: PhantomData,
40 }
41 }
42}
43
44#[allow(clippy::needless_lifetimes)]
45impl<'a, T> Drop for JoD<'a, T> {
46 fn drop(&mut self) {
47 for handle in self.handle.drain(..) {
48 handle.abort();
49 }
50 }
51}
52
53pub struct UserHalf {
54 internal_tx: Sender<UserPrimitive>,
55 _indication_handle: JoinHandle<()>,
56 history: Arc<RwLock<HashMap<TransactionID, Report>>>,
57 tokio_handle: tokio::runtime::Handle,
58}
59
60impl UserHalf {
61 #[allow(unused)]
62 pub async fn put(&self, request: PutRequest) -> Result<TransactionID, IoError> {
63 let (put_send, put_recv) = oneshot::channel();
64 let primitive = UserPrimitive::Put(request, put_send);
65
66 self.internal_tx
67 .send_timeout(primitive, Duration::from_secs(1))
68 .await
69 .map_err(|_| {
70 IoError::new(
71 ErrorKind::ConnectionReset,
72 "1 Daemon Half of User disconnected.",
73 )
74 })?;
75 put_recv.await.map_err(|_| {
76 IoError::new(
77 ErrorKind::ConnectionReset,
78 "1 Daemon Half of User disconnected.",
79 )
80 })
81 }
82
83 #[allow(unused)]
84 pub fn cancel(&self, transaction: TransactionID) -> Result<(), IoError> {
85 self.tokio_handle.block_on(async {
86 let primitive = UserPrimitive::Cancel(transaction);
87 self.internal_tx.send(primitive).await.map_err(|_| {
88 IoError::new(
89 ErrorKind::ConnectionReset,
90 "Daemon Half of User disconnected.",
91 )
92 })
93 })
94 }
95
96 #[allow(unused)]
97 pub async fn report(&self, transaction: TransactionID) -> Result<Option<Report>, IoError> {
98 let (report_tx, mut report_rx) = oneshot::channel();
99 let primitive = UserPrimitive::Report(transaction, report_tx);
100
101 self.internal_tx
102 .send_timeout(primitive, Duration::from_secs(1))
103 .await
104 .map_err(|err| {
105 IoError::new(
106 ErrorKind::ConnectionReset,
107 format!("Daemon Half of User disconnected on send: {err}"),
108 )
109 })?;
110 let response = match report_rx.try_recv() {
111 Ok(report) => Some(report),
112 Err(_) => self.history.read().unwrap().get(&transaction).cloned(),
114 };
115 Ok(response)
116 }
117
118 #[allow(unused)]
119 pub fn history(
120 &self,
121 ) -> Result<Vec<Report>, PoisonError<RwLockReadGuard<'_, HashMap<TransactionID, Report>>>> {
122 self.history.read().map(|history| {
123 let mut histories: Vec<Report> = Vec::with_capacity(history.len());
124 histories.extend(history.values().cloned());
125 histories
126 })
127 }
128
129 #[allow(unused)]
130 pub fn get_local_report(&self, transaction: TransactionID) -> Result<Option<Report>, IoError> {
131 self.history
132 .read()
133 .map(|history| history.get(&transaction).cloned())
134 .map_err(|e| IoError::new(ErrorKind::Other, e.to_string()))
135 }
136}
137
138pub struct StaticAssets {
139 pub filestore: Arc<NativeFileStore>,
140 _tokio_runtime: tokio::runtime::Runtime,
141}
142
143pub fn static_assets(filestore_path: &str) -> StaticAssets {
144 let utf8_path = Utf8PathBuf::from_str(filestore_path).unwrap();
145 let filestore = Arc::new(NativeFileStore::new(&utf8_path));
146 let _tokio_runtime = tokio::runtime::Builder::new_multi_thread()
147 .worker_threads(1)
148 .enable_io()
149 .enable_time()
150 .build()
151 .unwrap();
152
153 StaticAssets {
154 filestore,
155 _tokio_runtime,
156 }
157}
158
159type UserSplit = (UserHalf, Receiver<UserPrimitive>, Sender<Indication>);
160
161#[async_trait::async_trait]
162pub trait IndicationHandler {
163 async fn handle(&self, indication: Indication) -> anyhow::Result<Indication>;
164}
165
166struct User {
167 internal_tx: Sender<UserPrimitive>,
168 internal_rx: Receiver<UserPrimitive>,
169 indication_tx: Sender<Indication>,
171 indication_handle: JoinHandle<()>,
173 history: Arc<RwLock<HashMap<TransactionID, Report>>>,
174 tokio_handle: tokio::runtime::Handle,
175}
176impl User {
177 pub fn new<
178 T: FileStore + Send + Sync + 'static,
179 I: IndicationHandler + Send + Sync + 'static,
180 >(
181 _filestore: Arc<T>,
182 indication_handler: Arc<I>,
183 ) -> Self {
184 let (internal_tx, internal_rx) = mpsc::channel::<UserPrimitive>(1);
185 let (indication_tx, mut indication_rx) = mpsc::channel::<Indication>(1000);
186 let history = Arc::new(RwLock::new(HashMap::<TransactionID, Report>::new()));
187 let auto_history = Arc::clone(&history);
188
189 let indication_handle = tokio::task::spawn(async move {
190 while let Some(indication) = indication_rx.recv().await {
191 match indication_handler.handle(indication).await {
192 Ok(indication) => {
193 debug!("Received indication: {:?}", indication);
194 if let Indication::Report(report) = &indication {
195 debug!("Received Report indication: {:?}", report);
196 if let Err(e) = auto_history
197 .write()
198 .map(|mut guard| guard.insert(report.id, report.clone()))
199 {
200 error!("Failed to update history: {:?}", e);
201 }
202 }
203 }
204 _ => continue,
205 }
206 }
207 });
208
209 Self {
210 internal_tx,
211 internal_rx,
212 indication_tx,
213 indication_handle,
214 history,
215 tokio_handle: runtime::Handle::current(),
216 }
217 }
218
219 fn split(self) -> UserSplit {
220 let User {
221 internal_tx,
222 internal_rx,
223 indication_tx,
224 indication_handle,
225 history,
226 tokio_handle,
227 } = self;
228 (
229 UserHalf {
230 internal_tx,
231 _indication_handle: indication_handle,
232 history,
233 tokio_handle,
234 },
235 internal_rx,
236 indication_tx,
237 )
238 }
239}
240
241#[allow(clippy::too_many_arguments)]
242pub async fn create_daemon<
243 T: FileStore + Sync + Send + 'static,
244 I: IndicationHandler + Sync + Send + 'static,
245>(
246 filestore: Arc<T>,
247 indication_handler: Arc<I>,
248 transport_map: HashMap<Vec<EntityID>, Box<dyn PDUTransport + Send>>,
249 entity_config: EntityConfig,
250) -> (UserHalf, JoD<'static, Result<(), String>>) {
251 let filestore = filestore.clone();
252 let user = User::new(filestore.clone(), indication_handler);
253 let (userhalf, daemonhalf, indication_tx) = user.split();
254
255 let mut daemon = Daemon::new(
256 EntityID::from(entity_config.local_entity_id),
257 TransactionSeqNum::from(0_u32),
258 transport_map,
259 filestore,
260 HashMap::from([
261 (
262 EntityID::from(entity_config.local_entity_id),
263 entity_config.clone(),
264 ),
265 (
266 EntityID::from(entity_config.remote_entity_id),
267 entity_config.clone(),
268 ),
269 ]),
270 entity_config.clone(),
271 daemonhalf,
272 indication_tx,
273 );
274
275 let handle = tokio::task::spawn(async move {
276 daemon
277 .manage_transactions()
278 .await
279 .map_err(|e| e.to_string())?;
280
281 Ok(())
282 });
283 info!("Daemon created.");
284 (userhalf, JoD::from(handle))
285}