cfdp_simplified/daemon/
user.rs

1use std::{
2    collections::HashMap,
3    io::{Error as IoError, ErrorKind},
4    marker::PhantomData,
5    str::FromStr,
6    sync::{Arc, PoisonError, RwLock, RwLockReadGuard},
7    time::Duration,
8};
9
10use camino::Utf8PathBuf;
11use log::{debug, error, info};
12use tokio::{
13    runtime::{self},
14    sync::{
15        mpsc::{self, Receiver, Sender},
16        oneshot,
17    },
18    task::JoinHandle,
19};
20
21use super::{transport::PDUTransport, Daemon};
22use crate::{
23    daemon::{EntityConfig, Indication, PutRequest, Report, UserPrimitive},
24    filestore::{FileStore, NativeFileStore},
25    pdu::{EntityID, TransactionSeqNum},
26    transaction::TransactionID,
27};
28
29#[derive(Debug)]
30pub struct JoD<'a, T> {
31    pub handle: Vec<JoinHandle<T>>,
32    phantom: PhantomData<&'a ()>,
33}
34#[allow(clippy::needless_lifetimes)]
35impl<'a, T> From<JoinHandle<T>> for JoD<'a, T> {
36    fn from(input: JoinHandle<T>) -> Self {
37        Self {
38            handle: vec![input],
39            phantom: PhantomData,
40        }
41    }
42}
43
44#[allow(clippy::needless_lifetimes)]
45impl<'a, T> Drop for JoD<'a, T> {
46    fn drop(&mut self) {
47        for handle in self.handle.drain(..) {
48            handle.abort();
49        }
50    }
51}
52
53pub struct UserHalf {
54    internal_tx: Sender<UserPrimitive>,
55    _indication_handle: JoinHandle<()>,
56    history: Arc<RwLock<HashMap<TransactionID, Report>>>,
57    tokio_handle: tokio::runtime::Handle,
58}
59
60impl UserHalf {
61    #[allow(unused)]
62    pub async fn put(&self, request: PutRequest) -> Result<TransactionID, IoError> {
63        let (put_send, put_recv) = oneshot::channel();
64        let primitive = UserPrimitive::Put(request, put_send);
65
66        self.internal_tx
67            .send_timeout(primitive, Duration::from_secs(1))
68            .await
69            .map_err(|_| {
70                IoError::new(
71                    ErrorKind::ConnectionReset,
72                    "1 Daemon Half of User disconnected.",
73                )
74            })?;
75        put_recv.await.map_err(|_| {
76            IoError::new(
77                ErrorKind::ConnectionReset,
78                "1 Daemon Half of User disconnected.",
79            )
80        })
81    }
82
83    #[allow(unused)]
84    pub fn cancel(&self, transaction: TransactionID) -> Result<(), IoError> {
85        self.tokio_handle.block_on(async {
86            let primitive = UserPrimitive::Cancel(transaction);
87            self.internal_tx.send(primitive).await.map_err(|_| {
88                IoError::new(
89                    ErrorKind::ConnectionReset,
90                    "Daemon Half of User disconnected.",
91                )
92            })
93        })
94    }
95
96    #[allow(unused)]
97    pub async fn report(&self, transaction: TransactionID) -> Result<Option<Report>, IoError> {
98        let (report_tx, mut report_rx) = oneshot::channel();
99        let primitive = UserPrimitive::Report(transaction, report_tx);
100
101        self.internal_tx
102            .send_timeout(primitive, Duration::from_secs(1))
103            .await
104            .map_err(|err| {
105                IoError::new(
106                    ErrorKind::ConnectionReset,
107                    format!("Daemon Half of User disconnected on send: {err}"),
108                )
109            })?;
110        let response = match report_rx.try_recv() {
111            Ok(report) => Some(report),
112            // if the channel disconnects because the transaction is finished then just get from history.
113            Err(_) => self.history.read().unwrap().get(&transaction).cloned(),
114        };
115        Ok(response)
116    }
117
118    #[allow(unused)]
119    pub fn history(
120        &self,
121    ) -> Result<Vec<Report>, PoisonError<RwLockReadGuard<'_, HashMap<TransactionID, Report>>>> {
122        self.history.read().map(|history| {
123            let mut histories: Vec<Report> = Vec::with_capacity(history.len());
124            histories.extend(history.values().cloned());
125            histories
126        })
127    }
128
129    #[allow(unused)]
130    pub fn get_local_report(&self, transaction: TransactionID) -> Result<Option<Report>, IoError> {
131        self.history
132            .read()
133            .map(|history| history.get(&transaction).cloned())
134            .map_err(|e| IoError::new(ErrorKind::Other, e.to_string()))
135    }
136}
137
138pub struct StaticAssets {
139    pub filestore: Arc<NativeFileStore>,
140    _tokio_runtime: tokio::runtime::Runtime,
141}
142
143pub fn static_assets(filestore_path: &str) -> StaticAssets {
144    let utf8_path = Utf8PathBuf::from_str(filestore_path).unwrap();
145    let filestore = Arc::new(NativeFileStore::new(&utf8_path));
146    let _tokio_runtime = tokio::runtime::Builder::new_multi_thread()
147        .worker_threads(1)
148        .enable_io()
149        .enable_time()
150        .build()
151        .unwrap();
152
153    StaticAssets {
154        filestore,
155        _tokio_runtime,
156    }
157}
158
159type UserSplit = (UserHalf, Receiver<UserPrimitive>, Sender<Indication>);
160
161#[async_trait::async_trait]
162pub trait IndicationHandler {
163    async fn handle(&self, indication: Indication) -> anyhow::Result<Indication>;
164}
165
166struct User {
167    internal_tx: Sender<UserPrimitive>,
168    internal_rx: Receiver<UserPrimitive>,
169    // channel for daemon to indicate a finished transaction
170    indication_tx: Sender<Indication>,
171    // Indication listener thread
172    indication_handle: JoinHandle<()>,
173    history: Arc<RwLock<HashMap<TransactionID, Report>>>,
174    tokio_handle: tokio::runtime::Handle,
175}
176impl User {
177    pub fn new<
178        T: FileStore + Send + Sync + 'static,
179        I: IndicationHandler + Send + Sync + 'static,
180    >(
181        _filestore: Arc<T>,
182        indication_handler: Arc<I>,
183    ) -> Self {
184        let (internal_tx, internal_rx) = mpsc::channel::<UserPrimitive>(1);
185        let (indication_tx, mut indication_rx) = mpsc::channel::<Indication>(1000);
186        let history = Arc::new(RwLock::new(HashMap::<TransactionID, Report>::new()));
187        let auto_history = Arc::clone(&history);
188
189        let indication_handle = tokio::task::spawn(async move {
190            while let Some(indication) = indication_rx.recv().await {
191                match indication_handler.handle(indication).await {
192                    Ok(indication) => {
193                        debug!("Received indication: {:?}", indication);
194                        if let Indication::Report(report) = &indication {
195                            debug!("Received Report indication: {:?}", report);
196                            if let Err(e) = auto_history
197                                .write()
198                                .map(|mut guard| guard.insert(report.id, report.clone()))
199                            {
200                                error!("Failed to update history: {:?}", e);
201                            }
202                        }
203                    }
204                    _ => continue,
205                }
206            }
207        });
208
209        Self {
210            internal_tx,
211            internal_rx,
212            indication_tx,
213            indication_handle,
214            history,
215            tokio_handle: runtime::Handle::current(),
216        }
217    }
218
219    fn split(self) -> UserSplit {
220        let User {
221            internal_tx,
222            internal_rx,
223            indication_tx,
224            indication_handle,
225            history,
226            tokio_handle,
227        } = self;
228        (
229            UserHalf {
230                internal_tx,
231                _indication_handle: indication_handle,
232                history,
233                tokio_handle,
234            },
235            internal_rx,
236            indication_tx,
237        )
238    }
239}
240
241#[allow(clippy::too_many_arguments)]
242pub async fn create_daemon<
243    T: FileStore + Sync + Send + 'static,
244    I: IndicationHandler + Sync + Send + 'static,
245>(
246    filestore: Arc<T>,
247    indication_handler: Arc<I>,
248    transport_map: HashMap<Vec<EntityID>, Box<dyn PDUTransport + Send>>,
249    entity_config: EntityConfig,
250) -> (UserHalf, JoD<'static, Result<(), String>>) {
251    let filestore = filestore.clone();
252    let user = User::new(filestore.clone(), indication_handler);
253    let (userhalf, daemonhalf, indication_tx) = user.split();
254
255    let mut daemon = Daemon::new(
256        EntityID::from(entity_config.local_entity_id),
257        TransactionSeqNum::from(0_u32),
258        transport_map,
259        filestore,
260        HashMap::from([
261            (
262                EntityID::from(entity_config.local_entity_id),
263                entity_config.clone(),
264            ),
265            (
266                EntityID::from(entity_config.remote_entity_id),
267                entity_config.clone(),
268            ),
269        ]),
270        entity_config.clone(),
271        daemonhalf,
272        indication_tx,
273    );
274
275    let handle = tokio::task::spawn(async move {
276        daemon
277            .manage_transactions()
278            .await
279            .map_err(|e| e.to_string())?;
280
281        Ok(())
282    });
283    info!("Daemon created.");
284    (userhalf, JoD::from(handle))
285}