fusio_log/
lib.rs

1pub mod error;
2pub mod fs;
3mod option;
4mod serdes;
5use std::{io::Cursor, marker::PhantomData, sync::Arc};
6
7use error::LogError;
8use fs::hash::{HashReader, HashWriter};
9pub use fusio::path::Path;
10use fusio::{buffered::BufWriter, dynamic::DynFile, fs::OpenOptions, DynFs, DynWrite};
11use futures_core::TryStream;
12use futures_util::stream;
13#[allow(unused)]
14pub use option::*;
15pub use serdes::*;
16
17pub struct Logger<T> {
18    path: Path,
19    fs: Arc<dyn DynFs>,
20    buf_writer: BufWriter<Box<dyn DynFile>>,
21    _mark: PhantomData<T>,
22}
23
24impl<T> Logger<T>
25where
26    T: Encode,
27{
28    pub(crate) async fn new(option: Options) -> Result<Self, LogError> {
29        let fs = option.fs_option.parse()?;
30        let file = fs
31            .open_options(
32                &option.path,
33                OpenOptions::default()
34                    .read(true)
35                    .write(true)
36                    .create(true)
37                    .truncate(option.truncate),
38            )
39            .await?;
40
41        let buf_writer = BufWriter::new(file, option.buf_size);
42        Ok(Self {
43            fs,
44            buf_writer,
45            path: option.path,
46            _mark: PhantomData,
47        })
48    }
49
50    pub(crate) async fn with_fs(fs: Arc<dyn DynFs>, option: Options) -> Result<Self, LogError> {
51        let file = fs
52            .open_options(
53                &option.path,
54                OpenOptions::default()
55                    .read(true)
56                    .write(true)
57                    .create(true)
58                    .truncate(option.truncate),
59            )
60            .await?;
61
62        let buf_writer = BufWriter::new(file, option.buf_size);
63        Ok(Self {
64            fs,
65            buf_writer,
66            path: option.path,
67            _mark: PhantomData,
68        })
69    }
70}
71
72impl<T> Logger<T>
73where
74    T: Encode,
75{
76    pub async fn write_batch<'r>(
77        &mut self,
78        data: impl ExactSizeIterator<Item = &'r T>,
79    ) -> Result<(), LogError>
80    where
81        T: 'r,
82    {
83        let mut writer = HashWriter::new(&mut self.buf_writer);
84        (data.len() as u32).encode(&mut writer).await?;
85        for e in data {
86            e.encode(&mut writer)
87                .await
88                .map_err(|err| LogError::Encode {
89                    message: err.to_string(),
90                })?;
91        }
92        writer.eol().await?;
93        Ok(())
94    }
95
96    pub async fn write(&mut self, data: &T) -> Result<(), LogError> {
97        let mut writer = HashWriter::new(&mut self.buf_writer);
98
99        1_u32.encode(&mut writer).await.unwrap();
100        data.encode(&mut writer)
101            .await
102            .map_err(|err| LogError::Encode {
103                message: err.to_string(),
104            })?;
105        writer.eol().await?;
106        Ok(())
107    }
108
109    pub async fn flush(&mut self) -> Result<(), LogError> {
110        self.buf_writer.flush().await?;
111        Ok(())
112    }
113
114    pub async fn close(&mut self) -> Result<(), LogError> {
115        self.buf_writer.close().await?;
116        Ok(())
117    }
118}
119
120impl<T> Logger<T>
121where
122    T: Decode,
123{
124    pub(crate) async fn recover(
125        option: Options,
126    ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
127        let fs = option.fs_option.parse()?;
128        let file = fs
129            .open_options(&option.path, OpenOptions::default().create(false))
130            .await?;
131
132        Ok(Box::pin(stream::try_unfold(
133            (file, 0),
134            |(mut f, mut pos)| async move {
135                let mut cursor = Cursor::new(&mut f);
136                cursor.set_position(pos);
137                let mut reader = HashReader::new(cursor);
138
139                let Ok(len) = u32::decode(&mut reader).await else {
140                    f.close().await?;
141                    return Ok(None);
142                };
143                let mut buf = Vec::with_capacity(len as usize);
144                for _ in 0..len {
145                    match T::decode(&mut reader).await {
146                        Ok(record) => {
147                            buf.push(record);
148                        }
149                        Err(err) => {
150                            return Err(LogError::Decode {
151                                message: err.to_string(),
152                            });
153                        }
154                    }
155                }
156
157                pos += reader.position();
158                if !reader.checksum().await? {
159                    return Err(LogError::Checksum);
160                }
161                pos += size_of::<u32>() as u64;
162
163                Ok(Some((buf, (f, pos))))
164            },
165        )))
166    }
167}
168
169impl<T> Logger<T> {
170    /// Remove log file
171    pub async fn remove(self) -> Result<(), LogError> {
172        self.fs.remove(&self.path).await?;
173        Ok(())
174    }
175}
176
177#[cfg(test)]
178mod tests {
179
180    use futures_util::{StreamExt, TryStreamExt};
181    use tempfile::TempDir;
182    use tokio::pin;
183
184    use crate::{
185        fs::{AwsCredential, SeqRead, Write},
186        Decode, Encode, FsOptions, Options, Path,
187    };
188
189    #[derive(Debug, Clone)]
190    struct TestStruct {
191        id: u64,
192        name: String,
193        email: Option<String>,
194    }
195
196    impl Encode for TestStruct {
197        type Error = fusio::Error;
198
199        async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
200        where
201            W: Write,
202        {
203            self.id.encode(writer).await?;
204            self.name.encode(writer).await.unwrap();
205            self.email.encode(writer).await.unwrap();
206            Ok(())
207        }
208
209        fn size(&self) -> usize {
210            self.id.size() + self.name.size() + self.email.size()
211        }
212    }
213
214    impl Decode for TestStruct {
215        type Error = fusio::Error;
216
217        async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
218        where
219            R: SeqRead,
220        {
221            let id = u64::decode(reader).await?;
222            let name = String::decode(reader).await.unwrap();
223            let email = Option::<String>::decode(reader).await.unwrap();
224            Ok(Self { id, name, email })
225        }
226    }
227
228    fn test_items() -> Vec<TestStruct> {
229        let mut items = vec![];
230        for i in 0..50 {
231            items.push(TestStruct {
232                id: i,
233                name: format!("Tonbo{}", i),
234                email: Some(format!("fusio{}@tonboio.com", i)),
235            });
236        }
237        items
238    }
239
240    #[tokio::test]
241    async fn test_write_u8() {
242        let temp_dir = TempDir::new().unwrap();
243        let path = Path::from_filesystem_path(temp_dir.path())
244            .unwrap()
245            .child("u8");
246
247        {
248            let mut logger = Options::new(path.clone()).build::<u8>().await.unwrap();
249            logger.write(&1).await.unwrap();
250            logger.write_batch([2, 3, 4].iter()).await.unwrap();
251            logger
252                .write_batch([2, 3, 4, 5, 1, 255].iter())
253                .await
254                .unwrap();
255            logger.flush().await.unwrap();
256            logger.close().await.unwrap();
257        }
258        {
259            let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
260            let stream = Options::new(path)
261                .recover::<u8>()
262                .await
263                .unwrap()
264                .into_stream();
265            pin!(stream);
266            let mut i = 0;
267            while let Some(res) = stream.next().await {
268                assert!(res.is_ok());
269                assert_eq!(&expected[i], &res.unwrap());
270                i += 1;
271            }
272            assert_eq!(i, expected.len())
273        }
274    }
275
276    #[tokio::test]
277    async fn test_write_struct() {
278        let temp_dir = TempDir::new().unwrap();
279        let path = Path::from_filesystem_path(temp_dir.path())
280            .unwrap()
281            .child("struct");
282
283        {
284            let mut logger = Options::new(path.clone())
285                .build::<TestStruct>()
286                .await
287                .unwrap();
288            logger
289                .write(&TestStruct {
290                    id: 100,
291                    name: "Name".to_string(),
292                    email: None,
293                })
294                .await
295                .unwrap();
296            logger.write_batch(test_items().iter()).await.unwrap();
297            logger.flush().await.unwrap();
298            logger.close().await.unwrap();
299        }
300        {
301            let expected = [
302                &[TestStruct {
303                    id: 100,
304                    name: "Name".to_string(),
305                    email: None,
306                }],
307                &test_items()[0..],
308            ];
309            let stream = Options::new(path)
310                .recover::<TestStruct>()
311                .await
312                .unwrap()
313                .into_stream();
314            pin!(stream);
315            let mut i = 0;
316            while let Some(res) = stream.next().await {
317                assert!(res.is_ok());
318                for (left, right) in expected[i].iter().zip(res.unwrap()) {
319                    assert_eq!(left.id, right.id);
320                    assert_eq!(left.email, right.email);
321                }
322                i += 1;
323            }
324            assert_eq!(i, expected.len())
325        }
326    }
327
328    #[ignore = "s3"]
329    #[tokio::test]
330    async fn test_write_s3() {
331        let path = Path::from_url_path("log").unwrap();
332        let option = Options::new(path).fs(FsOptions::S3 {
333            bucket: "data".to_string(),
334            credential: Some(AwsCredential {
335                key_id: "key_id".to_string(),
336                secret_key: "secret_key".to_string(),
337                token: None,
338            }),
339            endpoint: None,
340            region: Some("region".to_string()),
341            sign_payload: None,
342            checksum: None,
343        });
344
345        {
346            let mut logger = option.clone().build::<TestStruct>().await.unwrap();
347            logger
348                .write(&TestStruct {
349                    id: 100,
350                    name: "Name".to_string(),
351                    email: None,
352                })
353                .await
354                .unwrap();
355            logger.write_batch(test_items().iter()).await.unwrap();
356            logger.flush().await.unwrap();
357            logger.close().await.unwrap();
358        }
359        {
360            let expected = [
361                &[TestStruct {
362                    id: 100,
363                    name: "Name".to_string(),
364                    email: None,
365                }],
366                &test_items()[0..],
367            ];
368            let stream = option.recover::<TestStruct>().await.unwrap().into_stream();
369            pin!(stream);
370            let mut i = 0;
371            while let Some(res) = stream.next().await {
372                assert!(res.is_ok());
373                for (left, right) in expected[i].iter().zip(res.unwrap()) {
374                    assert_eq!(left.id, right.id);
375                    assert_eq!(left.email, right.email);
376                }
377                i += 1;
378            }
379        }
380    }
381
382    #[tokio::test]
383    async fn test_recover_empty() {
384        let temp_dir = TempDir::new().unwrap();
385        let path = Path::from_filesystem_path(temp_dir.path())
386            .unwrap()
387            .child("empty");
388
389        {
390            let mut logger = Options::new(path.clone())
391                .build::<TestStruct>()
392                .await
393                .unwrap();
394            logger.flush().await.unwrap();
395            logger.close().await.unwrap();
396        }
397        {
398            let mut stream = Options::new(path).recover::<TestStruct>().await.unwrap();
399            let res = stream.try_next().await.unwrap();
400            assert!(res.is_none());
401        }
402    }
403}