fusio_log/
lib.rs

1pub mod error;
2pub mod fs;
3mod option;
4mod serdes;
5use std::{io::Cursor, marker::PhantomData, sync::Arc};
6
7use error::LogError;
8use fs::hash::{HashReader, HashWriter};
9pub use fusio::path::Path;
10use fusio::{buffered::BufWriter, dynamic::DynFile, fs::OpenOptions, DynFs, DynWrite};
11use futures_core::TryStream;
12use futures_util::stream;
13#[allow(unused)]
14pub use option::*;
15pub use serdes::*;
16
17pub struct Logger<T> {
18    path: Path,
19    fs: Arc<dyn DynFs>,
20    buf_writer: BufWriter<Box<dyn DynFile>>,
21    _mark: PhantomData<T>,
22}
23
24impl<T> Logger<T>
25where
26    T: Encode,
27{
28    pub(crate) async fn new(option: Options) -> Result<Self, LogError> {
29        let fs = option.fs_option.parse()?;
30        let file = fs
31            .open_options(
32                &option.path,
33                OpenOptions::default()
34                    .read(true)
35                    .write(true)
36                    .create(true)
37                    .truncate(option.truncate),
38            )
39            .await?;
40
41        let buf_writer = BufWriter::new(file, option.buf_size);
42        Ok(Self {
43            fs,
44            buf_writer,
45            path: option.path,
46            _mark: PhantomData,
47        })
48    }
49
50    pub(crate) async fn with_fs(fs: Arc<dyn DynFs>, option: Options) -> Result<Self, LogError> {
51        let file = fs
52            .open_options(
53                &option.path,
54                OpenOptions::default().read(true).write(true).create(true),
55            )
56            .await?;
57
58        let buf_writer = BufWriter::new(file, option.buf_size);
59        Ok(Self {
60            fs,
61            buf_writer,
62            path: option.path,
63            _mark: PhantomData,
64        })
65    }
66}
67
68impl<T> Logger<T>
69where
70    T: Encode,
71{
72    pub async fn write_batch<'r>(
73        &mut self,
74        data: impl ExactSizeIterator<Item = &'r T>,
75    ) -> Result<(), LogError>
76    where
77        T: 'r,
78    {
79        let mut writer = HashWriter::new(&mut self.buf_writer);
80        (data.len() as u32).encode(&mut writer).await?;
81        for e in data {
82            e.encode(&mut writer)
83                .await
84                .map_err(|err| LogError::Encode {
85                    message: err.to_string(),
86                })?;
87        }
88        writer.eol().await?;
89        Ok(())
90    }
91
92    pub async fn write(&mut self, data: &T) -> Result<(), LogError> {
93        let mut writer = HashWriter::new(&mut self.buf_writer);
94
95        1_u32.encode(&mut writer).await.unwrap();
96        data.encode(&mut writer)
97            .await
98            .map_err(|err| LogError::Encode {
99                message: err.to_string(),
100            })?;
101        writer.eol().await?;
102        Ok(())
103    }
104
105    pub async fn flush(&mut self) -> Result<(), LogError> {
106        self.buf_writer.flush().await?;
107        Ok(())
108    }
109
110    pub async fn close(&mut self) -> Result<(), LogError> {
111        self.buf_writer.close().await?;
112        Ok(())
113    }
114}
115
116impl<T> Logger<T>
117where
118    T: Decode,
119{
120    pub(crate) async fn recover(
121        option: Options,
122    ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
123        let fs = option.fs_option.parse()?;
124        let file = fs
125            .open_options(&option.path, OpenOptions::default().create(false))
126            .await?;
127
128        Ok(Box::pin(stream::try_unfold(
129            (file, 0),
130            |(mut f, mut pos)| async move {
131                let mut cursor = Cursor::new(&mut f);
132                cursor.set_position(pos);
133                let mut reader = HashReader::new(cursor);
134
135                let Ok(len) = u32::decode(&mut reader).await else {
136                    f.close().await?;
137                    return Ok(None);
138                };
139                let mut buf = Vec::with_capacity(len as usize);
140                for _ in 0..len {
141                    match T::decode(&mut reader).await {
142                        Ok(record) => {
143                            buf.push(record);
144                        }
145                        Err(err) => {
146                            return Err(LogError::Decode {
147                                message: err.to_string(),
148                            });
149                        }
150                    }
151                }
152
153                pos += reader.position();
154                if !reader.checksum().await? {
155                    return Err(LogError::Checksum);
156                }
157                pos += size_of::<u32>() as u64;
158
159                Ok(Some((buf, (f, pos))))
160            },
161        )))
162    }
163}
164
165impl<T> Logger<T> {
166    /// Remove log file
167    pub async fn remove(self) -> Result<(), LogError> {
168        self.fs.remove(&self.path).await?;
169        Ok(())
170    }
171}
172
173#[cfg(test)]
174mod tests {
175
176    use futures_util::{StreamExt, TryStreamExt};
177    use tempfile::TempDir;
178    use tokio::pin;
179
180    use crate::{
181        fs::{AwsCredential, SeqRead, Write},
182        Decode, Encode, FsOptions, Options, Path,
183    };
184
185    #[derive(Debug, Clone)]
186    struct TestStruct {
187        id: u64,
188        name: String,
189        email: Option<String>,
190    }
191
192    impl Encode for TestStruct {
193        type Error = fusio::Error;
194
195        async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
196        where
197            W: Write,
198        {
199            self.id.encode(writer).await?;
200            self.name.encode(writer).await.unwrap();
201            self.email.encode(writer).await.unwrap();
202            Ok(())
203        }
204
205        fn size(&self) -> usize {
206            self.id.size() + self.name.size() + self.email.size()
207        }
208    }
209
210    impl Decode for TestStruct {
211        type Error = fusio::Error;
212
213        async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
214        where
215            R: SeqRead,
216        {
217            let id = u64::decode(reader).await?;
218            let name = String::decode(reader).await.unwrap();
219            let email = Option::<String>::decode(reader).await.unwrap();
220            Ok(Self { id, name, email })
221        }
222    }
223
224    fn test_items() -> Vec<TestStruct> {
225        let mut items = vec![];
226        for i in 0..50 {
227            items.push(TestStruct {
228                id: i,
229                name: format!("Tonbo{}", i),
230                email: Some(format!("fusio{}@tonboio.com", i)),
231            });
232        }
233        items
234    }
235
236    #[tokio::test]
237    async fn test_write_u8() {
238        let temp_dir = TempDir::new().unwrap();
239        let path = Path::from_filesystem_path(temp_dir.path())
240            .unwrap()
241            .child("u8");
242
243        {
244            let mut logger = Options::new(path.clone()).build::<u8>().await.unwrap();
245            logger.write(&1).await.unwrap();
246            logger.write_batch([2, 3, 4].iter()).await.unwrap();
247            logger
248                .write_batch([2, 3, 4, 5, 1, 255].iter())
249                .await
250                .unwrap();
251            logger.flush().await.unwrap();
252            logger.close().await.unwrap();
253        }
254        {
255            let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
256            let stream = Options::new(path)
257                .recover::<u8>()
258                .await
259                .unwrap()
260                .into_stream();
261            pin!(stream);
262            let mut i = 0;
263            while let Some(res) = stream.next().await {
264                assert!(res.is_ok());
265                assert_eq!(&expected[i], &res.unwrap());
266                i += 1;
267            }
268            assert_eq!(i, expected.len())
269        }
270    }
271
272    #[tokio::test]
273    async fn test_write_struct() {
274        let temp_dir = TempDir::new().unwrap();
275        let path = Path::from_filesystem_path(temp_dir.path())
276            .unwrap()
277            .child("struct");
278
279        {
280            let mut logger = Options::new(path.clone())
281                .build::<TestStruct>()
282                .await
283                .unwrap();
284            logger
285                .write(&TestStruct {
286                    id: 100,
287                    name: "Name".to_string(),
288                    email: None,
289                })
290                .await
291                .unwrap();
292            logger.write_batch(test_items().iter()).await.unwrap();
293            logger.flush().await.unwrap();
294            logger.close().await.unwrap();
295        }
296        {
297            let expected = [
298                &[TestStruct {
299                    id: 100,
300                    name: "Name".to_string(),
301                    email: None,
302                }],
303                &test_items()[0..],
304            ];
305            let stream = Options::new(path)
306                .recover::<TestStruct>()
307                .await
308                .unwrap()
309                .into_stream();
310            pin!(stream);
311            let mut i = 0;
312            while let Some(res) = stream.next().await {
313                assert!(res.is_ok());
314                for (left, right) in expected[i].iter().zip(res.unwrap()) {
315                    assert_eq!(left.id, right.id);
316                    assert_eq!(left.email, right.email);
317                }
318                i += 1;
319            }
320            assert_eq!(i, expected.len())
321        }
322    }
323
324    #[ignore = "s3"]
325    #[tokio::test]
326    async fn test_write_s3() {
327        let path = Path::from_url_path("log").unwrap();
328        let option = Options::new(path).fs(FsOptions::S3 {
329            bucket: "data".to_string(),
330            credential: Some(AwsCredential {
331                key_id: "key_id".to_string(),
332                secret_key: "secret_key".to_string(),
333                token: None,
334            }),
335            endpoint: None,
336            region: Some("region".to_string()),
337            sign_payload: None,
338            checksum: None,
339        });
340
341        {
342            let mut logger = option.clone().build::<TestStruct>().await.unwrap();
343            logger
344                .write(&TestStruct {
345                    id: 100,
346                    name: "Name".to_string(),
347                    email: None,
348                })
349                .await
350                .unwrap();
351            logger.write_batch(test_items().iter()).await.unwrap();
352            logger.flush().await.unwrap();
353            logger.close().await.unwrap();
354        }
355        {
356            let expected = [
357                &[TestStruct {
358                    id: 100,
359                    name: "Name".to_string(),
360                    email: None,
361                }],
362                &test_items()[0..],
363            ];
364            let stream = option.recover::<TestStruct>().await.unwrap().into_stream();
365            pin!(stream);
366            let mut i = 0;
367            while let Some(res) = stream.next().await {
368                assert!(res.is_ok());
369                for (left, right) in expected[i].iter().zip(res.unwrap()) {
370                    assert_eq!(left.id, right.id);
371                    assert_eq!(left.email, right.email);
372                }
373                i += 1;
374            }
375        }
376    }
377
378    #[tokio::test]
379    async fn test_recover_empty() {
380        let temp_dir = TempDir::new().unwrap();
381        let path = Path::from_filesystem_path(temp_dir.path())
382            .unwrap()
383            .child("empty");
384
385        {
386            let mut logger = Options::new(path.clone())
387                .build::<TestStruct>()
388                .await
389                .unwrap();
390            logger.flush().await.unwrap();
391            logger.close().await.unwrap();
392        }
393        {
394            let mut stream = Options::new(path).recover::<TestStruct>().await.unwrap();
395            let res = stream.try_next().await.unwrap();
396            assert!(res.is_none());
397        }
398    }
399}