wmjtyd_libstock/file/
writer.rs1use std::{
5 fmt::Display,
6 path::{Path, PathBuf},
7};
8
9use flume::{Receiver, Sender};
10use tokio::{fs::OpenOptions, task::JoinHandle};
11use tracing::Instrument;
12use uuid::Uuid;
13
14use super::{
15 datadir::{get_data_directory, get_ident_path},
16 ident::get_ident,
17 timestamp::get_timestamp,
18};
19
20#[derive(Clone, Hash, PartialEq, Debug)]
36pub struct DataEntry {
37 pub filename: String,
39 pub data: Vec<u8>,
41}
42
43impl Display for DataEntry {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "{} ({} bytes)", self.filename, self.data.len())
46 }
47}
48
49#[non_exhaustive]
51enum WriterAction {
52 Stop,
54
55 FileWrite(DataEntry),
57}
58
59pub struct DataWriter {
81 writer_id: Uuid,
82
83 sender: Sender<WriterAction>,
84 receiver: Receiver<WriterAction>,
85}
86
87impl DataWriter {
88 pub fn new() -> DataWriter {
90 DataWriter::default()
91 }
92
93 pub fn add(&mut self, data: DataEntry) -> WriteResult<()> {
112 tracing::info!(
113 "Adding data {data} to writer {writer}",
114 writer = self.writer_id
115 );
116
117 self.sender
118 .send(WriterAction::FileWrite(data))
119 .map_err(|_| WriteError::PushChannelFailed)
120 }
121
122 pub async fn start(&self) -> WriteResult<JoinHandle<()>> {
124 let span = tracing::info_span!(
125 "DataWriter::start",
126 id = self.writer_id.to_string().as_str()
127 );
128
129 async move {
130 let data_dir = Self::get_data_dir();
131 Self::create_data_dir(data_dir.as_path()).await?;
132
133 let receiver = self.receiver.clone();
134
135 tracing::info!("Starting daemon…");
136 let span = tracing::info_span!("daemon");
137 Ok(tokio::task::spawn(
138 async move {
139 loop {
140 let task = async {
141 let action = receiver
142 .recv_async()
143 .await
144 .map_err(DaemonError::RecvActionFailed)?;
145
146 Self::process_action(action).await
147 };
148
149 if let Err(e) = task.await {
150 match e {
151 DaemonError::StopDaemon => {
152 tracing::trace!("Received the forwarded “StopDaemon” request.");
153 break;
154 }
155 _ => {
156 tracing::error!("Error happened: {e}; skipping.");
157 continue;
158 }
159 }
160 }
161 }
162 }
163 .instrument(span),
164 ))
165 }
166 .instrument(span)
167 .await
168 }
169
170 pub fn stop(&self) -> WriteResult<()> {
172 tracing::info!("Stopping writer {writer}…", writer = self.writer_id);
173
174 self.sender
175 .send(WriterAction::Stop)
176 .map_err(|_| WriteError::PushChannelFailed)
177 }
178
179 fn get_data_dir() -> PathBuf {
180 let data_dir = get_data_directory();
181 tracing::info!("The files will be saved in: {}", data_dir.display());
182
183 data_dir
184 }
185
186 async fn create_data_dir(data_dir: &Path) -> WriteResult<()> {
187 if data_dir.exists() {
188 tracing::debug!("The data directory has been created. Ignoring.");
189 } else {
190 tracing::info!("Creating the data directory…");
191 tokio::fs::create_dir_all(data_dir)
192 .await
193 .map_err(WriteError::DataDirCreationFailed)?;
194 }
195
196 Ok(())
197 }
198
199 async fn process_action(action: WriterAction) -> Result<(), DaemonError> {
200 match action {
201 WriterAction::FileWrite(DataEntry { filename, data }) => {
202 tracing::trace!("Received a data entry. Processing…");
203
204 let identifier = get_ident(&filename, &get_timestamp());
206
207 tracing::debug!("Writing ”{filename}“, data_len: {len}…", len = data.len());
209 let path_to_write = get_ident_path(&identifier);
210 write_content(path_to_write, data.as_slice()).await?;
211 }
212 WriterAction::Stop => {
213 tracing::debug!("Daemon has received stop signal. Exiting.");
214 return Err(DaemonError::StopDaemon);
215 }
216 }
217
218 Ok(())
219 }
220}
221
222impl Default for DataWriter {
223 fn default() -> Self {
224 let (sender, receiver) = flume::unbounded();
225
226 Self {
227 writer_id: Uuid::new_v4(),
229 sender,
230 receiver,
231 }
232 }
233}
234
235async fn write_content(path: impl AsRef<std::path::Path>, data: &[u8]) -> WriteResult<()> {
236 use tokio::io::AsyncWriteExt;
237
238 let mut file = OpenOptions::new()
239 .create(true)
240 .append(true)
241 .open(path)
242 .await
243 .map_err(WriteError::FileOpenFailed)?;
244
245 let data_len = data.len().to_be_bytes();
247 file.write_all(&data_len)
248 .await
249 .map_err(WriteError::LengthWriteFailed)?;
250
251 file.write_all(data)
253 .await
254 .map_err(WriteError::DataWriteFailed)?;
255
256 Ok(())
257}
258
259#[derive(Debug, thiserror::Error)]
260pub enum WriteError {
261 #[error("failed to create data directory: {0}")]
262 DataDirCreationFailed(tokio::io::Error),
263
264 #[error("failed to push an entry to channel")]
265 PushChannelFailed,
266
267 #[error("failed to open file: {0}")]
268 FileOpenFailed(tokio::io::Error),
269
270 #[error("failed to write length to file: {0}")]
271 LengthWriteFailed(tokio::io::Error),
272
273 #[error("failed to write data to file: {0}")]
274 DataWriteFailed(tokio::io::Error),
275}
276
277#[derive(Debug, thiserror::Error)]
278enum DaemonError {
279 #[error("writer error: {0}")]
280 WriterError(#[from] WriteError),
281
282 #[error("failed to receive action: {0}")]
283 RecvActionFailed(flume::RecvError),
284
285 #[error("received stop signal")]
286 StopDaemon,
287}
288
289pub type WriteResult<T> = Result<T, WriteError>;