pass_it_on/endpoints/
file.rs

1//! Regular file [`Endpoint`] and [`EndpointConfig`] implementation
2//!
3//! # Configuration Example
4//! ```toml
5//! [[server.endpoint]]
6//! type = "file"
7//! path = 'path/to/file_endpoint.txt'
8//! notifications = ["notification_id1", "notification_id2"]
9//! ```
10
11use crate::endpoints::{Endpoint, EndpointConfig};
12use crate::notifications::{Key, ValidatedNotification};
13use crate::{Error};
14use async_trait::async_trait;
15use tracing::{info, warn};
16use serde::Deserialize;
17use std::any::Any;
18use std::collections::{HashMap, HashSet};
19use std::path::{Path, PathBuf};
20use tokio::fs::OpenOptions;
21use tokio::io::{AsyncWriteExt, BufWriter};
22use tokio::sync::{broadcast, watch};
23
24const LINE_FEED: &[u8] = "\n".as_bytes();
25
26/// Data structure to represent the regular file [`EndpointConfig`].
27#[derive(Debug, Deserialize, PartialEq, Eq, Hash, Clone)]
28pub(crate) struct FileConfigFile {
29    path: String,
30    notifications: Vec<String>,
31}
32
33/// Data structure to represent the regular file [`Endpoint`].
34#[derive(Debug, Clone)]
35pub struct FileEndpoint {
36    path: PathBuf,
37    notifications: Vec<String>,
38}
39
40impl FileEndpoint {
41    /// Create a new `FileEndpoint`.
42    pub fn new(path: &str, notifications: &[String]) -> Self {
43        let path = PathBuf::from(path);
44        let notifications = notifications.into();
45        Self { path, notifications }
46    }
47    /// Return the file path.
48    pub fn path(&self) -> &PathBuf {
49        &self.path
50    }
51
52    /// Return all associated notification names.
53    pub fn notifications(&self) -> &[String] {
54        &self.notifications
55    }
56}
57
58impl TryFrom<&FileConfigFile> for FileEndpoint {
59    type Error = Error;
60
61    fn try_from(value: &FileConfigFile) -> Result<Self, Self::Error> {
62        if value.path.is_empty() {
63            return Err(Error::InvalidEndpointConfiguration("File configuration path is blank".to_string()));
64        }
65
66        if value.notifications.is_empty() {
67            return Err(Error::InvalidEndpointConfiguration(
68                "File configuration has no notifications setup".to_string(),
69            ));
70        }
71
72        Ok(FileEndpoint::new(value.path.as_str(), &value.notifications))
73    }
74}
75
76#[typetag::deserialize(name = "file")]
77impl EndpointConfig for FileConfigFile {
78    fn to_endpoint(&self) -> Result<Box<dyn Endpoint + Send>, Error> {
79        Ok(Box::new(FileEndpoint::try_from(self)?))
80    }
81}
82
83#[async_trait]
84impl Endpoint for FileEndpoint {
85    async fn notify(
86        &self,
87        endpoint_rx: broadcast::Receiver<ValidatedNotification>,
88        shutdown: watch::Receiver<bool>,
89    ) -> Result<(), Error> {
90        let path = self.path().clone();
91        info!("Setting up Endpoint: File -> {}", path.to_str().unwrap_or_default());
92        tokio::spawn(async move { write_file(path, endpoint_rx, shutdown).await });
93        Ok(())
94    }
95
96    fn generate_keys(&self, hash_key: &Key) -> HashMap<String, HashSet<Key>> {
97        let keys: HashSet<Key> = self
98            .notifications()
99            .iter()
100            .map(|notification_name| Key::generate(notification_name.as_str(), hash_key))
101            .collect();
102
103        let mut map = HashMap::new();
104        map.insert("".to_string(), keys);
105        map
106    }
107
108    fn as_any(&self) -> &dyn Any {
109        self
110    }
111}
112
113async fn write_file<P: AsRef<Path>>(
114    path: P,
115    endpoint_rx: broadcast::Receiver<ValidatedNotification>,
116    shutdown: watch::Receiver<bool>,
117) -> Result<(), Error> {
118    let mut rx = endpoint_rx.resubscribe();
119    let mut shutdown_rx = shutdown.clone();
120
121    let file = OpenOptions::new().read(true).append(true).create(true).open(path.as_ref()).await?;
122    let mut file = BufWriter::new(file);
123    loop {
124        tokio::select! {
125            received = rx.recv() => {
126                if let Ok(message) = received {
127                    let line = [message.message().text().as_bytes(), LINE_FEED].concat();
128                    match file.write(line.as_slice()).await {
129                        Ok(_) => (),
130                        Err(e) => warn!("{}", e)
131                    }
132
133                    match file.flush().await {
134                        Ok(_) => (),
135                        Err(e) => warn!("{}", e),
136                    };
137                }
138            }
139
140            _ = shutdown_rx.changed() => {
141                break;
142            }
143        }
144    }
145
146    file.shutdown().await?;
147    Ok(())
148}