pass_it_on/endpoints/
file.rs1use crate::endpoints::{Endpoint, EndpointConfig};
12use crate::notifications::{Key, ValidatedNotification};
13use crate::{Error};
14use async_trait::async_trait;
15use tracing::{info, warn};
16use serde::Deserialize;
17use std::any::Any;
18use std::collections::{HashMap, HashSet};
19use std::path::{Path, PathBuf};
20use tokio::fs::OpenOptions;
21use tokio::io::{AsyncWriteExt, BufWriter};
22use tokio::sync::{broadcast, watch};
23
24const LINE_FEED: &[u8] = "\n".as_bytes();
25
26#[derive(Debug, Deserialize, PartialEq, Eq, Hash, Clone)]
28pub(crate) struct FileConfigFile {
29 path: String,
30 notifications: Vec<String>,
31}
32
33#[derive(Debug, Clone)]
35pub struct FileEndpoint {
36 path: PathBuf,
37 notifications: Vec<String>,
38}
39
40impl FileEndpoint {
41 pub fn new(path: &str, notifications: &[String]) -> Self {
43 let path = PathBuf::from(path);
44 let notifications = notifications.into();
45 Self { path, notifications }
46 }
47 pub fn path(&self) -> &PathBuf {
49 &self.path
50 }
51
52 pub fn notifications(&self) -> &[String] {
54 &self.notifications
55 }
56}
57
58impl TryFrom<&FileConfigFile> for FileEndpoint {
59 type Error = Error;
60
61 fn try_from(value: &FileConfigFile) -> Result<Self, Self::Error> {
62 if value.path.is_empty() {
63 return Err(Error::InvalidEndpointConfiguration("File configuration path is blank".to_string()));
64 }
65
66 if value.notifications.is_empty() {
67 return Err(Error::InvalidEndpointConfiguration(
68 "File configuration has no notifications setup".to_string(),
69 ));
70 }
71
72 Ok(FileEndpoint::new(value.path.as_str(), &value.notifications))
73 }
74}
75
76#[typetag::deserialize(name = "file")]
77impl EndpointConfig for FileConfigFile {
78 fn to_endpoint(&self) -> Result<Box<dyn Endpoint + Send>, Error> {
79 Ok(Box::new(FileEndpoint::try_from(self)?))
80 }
81}
82
83#[async_trait]
84impl Endpoint for FileEndpoint {
85 async fn notify(
86 &self,
87 endpoint_rx: broadcast::Receiver<ValidatedNotification>,
88 shutdown: watch::Receiver<bool>,
89 ) -> Result<(), Error> {
90 let path = self.path().clone();
91 info!("Setting up Endpoint: File -> {}", path.to_str().unwrap_or_default());
92 tokio::spawn(async move { write_file(path, endpoint_rx, shutdown).await });
93 Ok(())
94 }
95
96 fn generate_keys(&self, hash_key: &Key) -> HashMap<String, HashSet<Key>> {
97 let keys: HashSet<Key> = self
98 .notifications()
99 .iter()
100 .map(|notification_name| Key::generate(notification_name.as_str(), hash_key))
101 .collect();
102
103 let mut map = HashMap::new();
104 map.insert("".to_string(), keys);
105 map
106 }
107
108 fn as_any(&self) -> &dyn Any {
109 self
110 }
111}
112
113async fn write_file<P: AsRef<Path>>(
114 path: P,
115 endpoint_rx: broadcast::Receiver<ValidatedNotification>,
116 shutdown: watch::Receiver<bool>,
117) -> Result<(), Error> {
118 let mut rx = endpoint_rx.resubscribe();
119 let mut shutdown_rx = shutdown.clone();
120
121 let file = OpenOptions::new().read(true).append(true).create(true).open(path.as_ref()).await?;
122 let mut file = BufWriter::new(file);
123 loop {
124 tokio::select! {
125 received = rx.recv() => {
126 if let Ok(message) = received {
127 let line = [message.message().text().as_bytes(), LINE_FEED].concat();
128 match file.write(line.as_slice()).await {
129 Ok(_) => (),
130 Err(e) => warn!("{}", e)
131 }
132
133 match file.flush().await {
134 Ok(_) => (),
135 Err(e) => warn!("{}", e),
136 };
137 }
138 }
139
140 _ = shutdown_rx.changed() => {
141 break;
142 }
143 }
144 }
145
146 file.shutdown().await?;
147 Ok(())
148}