wmjtyd_libstock/file/
writer.rs

1//! The writer daemon to write data and place file automatically
2//! without worrying about managing the path.
3
4use std::{
5    fmt::Display,
6    path::{Path, PathBuf},
7};
8
9use flume::{Receiver, Sender};
10use tokio::{fs::OpenOptions, task::JoinHandle};
11use tracing::Instrument;
12use uuid::Uuid;
13
14use super::{
15    datadir::{get_data_directory, get_ident_path},
16    ident::get_ident,
17    timestamp::get_timestamp,
18};
19
20/// A owned data entry to send to a [`DataWriter`].
21///
22/// # Example
23///
24/// ```
25/// use wmjtyd_libstock::file::writer::DataEntry;
26///
27/// let de = DataEntry {
28///    filename: "test".to_string(),
29///    data: b"OwO".to_vec(),
30/// };
31/// let de_clone = de.clone();
32///
33/// assert_eq!(de, de_clone);
34/// ```
35#[derive(Clone, Hash, PartialEq, Debug)]
36pub struct DataEntry {
37    /// The file name to write as.
38    pub filename: String,
39    /// The data to write.
40    pub data: Vec<u8>,
41}
42
43impl Display for DataEntry {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(f, "{} ({} bytes)", self.filename, self.data.len())
46    }
47}
48
49/// The action to pass to the writer daemon channel.
50#[non_exhaustive]
51enum WriterAction {
52    /// Stop daemon.
53    Stop,
54
55    /// Send [`DataEntry`] to the daemon to write.
56    FileWrite(DataEntry),
57}
58
59/// The writer daemon to write data and place file automatically,
60/// without worrying about managing the path; and asynchronoly,
61/// with a synchoronous `.add()` API.
62///
63/// # Example
64///
65/// ```
66/// use wmjtyd_libstock::file::writer::{DataWriter, DataEntry};
67///
68/// let mut writer = DataWriter::new();
69/// writer.start();
70///
71/// writer.add(DataEntry {
72///    // It will be saved to './record/test20190101.csv'
73///    // according to our definition in `super::datadir`.
74///    filename: "test".to_string(),
75///
76///    // `.to_vec()` is needed to write it asynchoronously.
77///    data: b"OwO".to_vec(),
78/// });
79/// ```
80pub struct DataWriter {
81    writer_id: Uuid,
82
83    sender: Sender<WriterAction>,
84    receiver: Receiver<WriterAction>,
85}
86
87impl DataWriter {
88    /// Create a new [`DataWriter`].
89    pub fn new() -> DataWriter {
90        DataWriter::default()
91    }
92
93    /// Push a [`DataEntry`] to write.
94    ///
95    /// # Example
96    ///
97    /// ```
98    /// use wmjtyd_libstock::file::writer::{DataWriter, DataEntry};
99    ///
100    /// let mut writer = DataWriter::new();
101    ///
102    /// writer.add(DataEntry {
103    ///    // It will be saved to './record/test20190101.csv'
104    ///    // according to our definition in `super::datadir`.
105    ///    filename: "test".to_string(),
106    ///
107    ///    // `.to_vec()` is needed to write it asynchoronously.
108    ///    data: b"OwO".to_vec(),
109    /// });
110    /// ```
111    pub fn add(&mut self, data: DataEntry) -> WriteResult<()> {
112        tracing::info!(
113            "Adding data {data} to writer {writer}",
114            writer = self.writer_id
115        );
116
117        self.sender
118            .send(WriterAction::FileWrite(data))
119            .map_err(|_| WriteError::PushChannelFailed)
120    }
121
122    /// Spawn the writer daemon.
123    pub async fn start(&self) -> WriteResult<JoinHandle<()>> {
124        let span = tracing::info_span!(
125            "DataWriter::start",
126            id = self.writer_id.to_string().as_str()
127        );
128
129        async move {
130            let data_dir = Self::get_data_dir();
131            Self::create_data_dir(data_dir.as_path()).await?;
132
133            let receiver = self.receiver.clone();
134
135            tracing::info!("Starting daemon…");
136            let span = tracing::info_span!("daemon");
137            Ok(tokio::task::spawn(
138                async move {
139                    loop {
140                        let task = async {
141                            let action = receiver
142                                .recv_async()
143                                .await
144                                .map_err(DaemonError::RecvActionFailed)?;
145
146                            Self::process_action(action).await
147                        };
148
149                        if let Err(e) = task.await {
150                            match e {
151                                DaemonError::StopDaemon => {
152                                    tracing::trace!("Received the forwarded “StopDaemon” request.");
153                                    break;
154                                }
155                                _ => {
156                                    tracing::error!("Error happened: {e}; skipping.");
157                                    continue;
158                                }
159                            }
160                        }
161                    }
162                }
163                .instrument(span),
164            ))
165        }
166        .instrument(span)
167        .await
168    }
169
170    /// Stop the writer daemon.
171    pub fn stop(&self) -> WriteResult<()> {
172        tracing::info!("Stopping writer {writer}…", writer = self.writer_id);
173
174        self.sender
175            .send(WriterAction::Stop)
176            .map_err(|_| WriteError::PushChannelFailed)
177    }
178
179    fn get_data_dir() -> PathBuf {
180        let data_dir = get_data_directory();
181        tracing::info!("The files will be saved in: {}", data_dir.display());
182
183        data_dir
184    }
185
186    async fn create_data_dir(data_dir: &Path) -> WriteResult<()> {
187        if data_dir.exists() {
188            tracing::debug!("The data directory has been created. Ignoring.");
189        } else {
190            tracing::info!("Creating the data directory…");
191            tokio::fs::create_dir_all(data_dir)
192                .await
193                .map_err(WriteError::DataDirCreationFailed)?;
194        }
195
196        Ok(())
197    }
198
199    async fn process_action(action: WriterAction) -> Result<(), DaemonError> {
200        match action {
201            WriterAction::FileWrite(DataEntry { filename, data }) => {
202                tracing::trace!("Received a data entry. Processing…");
203
204                // Get the timestamp, and get the identifier.
205                let identifier = get_ident(&filename, &get_timestamp());
206
207                // Write file to the specified path.
208                tracing::debug!("Writing ”{filename}“, data_len: {len}…", len = data.len());
209                let path_to_write = get_ident_path(&identifier);
210                write_content(path_to_write, data.as_slice()).await?;
211            }
212            WriterAction::Stop => {
213                tracing::debug!("Daemon has received stop signal. Exiting.");
214                return Err(DaemonError::StopDaemon);
215            }
216        }
217
218        Ok(())
219    }
220}
221
222impl Default for DataWriter {
223    fn default() -> Self {
224        let (sender, receiver) = flume::unbounded();
225
226        Self {
227            // Generate a writer ID for debugging.
228            writer_id: Uuid::new_v4(),
229            sender,
230            receiver,
231        }
232    }
233}
234
235async fn write_content(path: impl AsRef<std::path::Path>, data: &[u8]) -> WriteResult<()> {
236    use tokio::io::AsyncWriteExt;
237
238    let mut file = OpenOptions::new()
239        .create(true)
240        .append(true)
241        .open(path)
242        .await
243        .map_err(WriteError::FileOpenFailed)?;
244
245    // First, write length to file.
246    let data_len = data.len().to_be_bytes();
247    file.write_all(&data_len)
248        .await
249        .map_err(WriteError::LengthWriteFailed)?;
250
251    // Then, write data to file.
252    file.write_all(data)
253        .await
254        .map_err(WriteError::DataWriteFailed)?;
255
256    Ok(())
257}
258
259#[derive(Debug, thiserror::Error)]
260pub enum WriteError {
261    #[error("failed to create data directory: {0}")]
262    DataDirCreationFailed(tokio::io::Error),
263
264    #[error("failed to push an entry to channel")]
265    PushChannelFailed,
266
267    #[error("failed to open file: {0}")]
268    FileOpenFailed(tokio::io::Error),
269
270    #[error("failed to write length to file: {0}")]
271    LengthWriteFailed(tokio::io::Error),
272
273    #[error("failed to write data to file: {0}")]
274    DataWriteFailed(tokio::io::Error),
275}
276
277#[derive(Debug, thiserror::Error)]
278enum DaemonError {
279    #[error("writer error: {0}")]
280    WriterError(#[from] WriteError),
281
282    #[error("failed to receive action: {0}")]
283    RecvActionFailed(flume::RecvError),
284
285    #[error("received stop signal")]
286    StopDaemon,
287}
288
289pub type WriteResult<T> = Result<T, WriteError>;