fusio_log/
lib.rs

1pub mod error;
2pub mod fs;
3mod option;
4mod serdes;
5use std::{io::Cursor, marker::PhantomData, sync::Arc};
6
7use error::LogError;
8use fs::hash::{HashReader, HashWriter};
9pub use fusio::path::Path;
10use fusio::{
11    buffered::{BufReader, BufWriter},
12    dynamic::DynFile,
13    fs::OpenOptions,
14    DynFs, DynWrite,
15};
16use futures_core::TryStream;
17use futures_util::stream;
18#[allow(unused)]
19pub use option::*;
20pub use serdes::*;
21
22pub struct Logger<T> {
23    path: Path,
24    fs: Arc<dyn DynFs>,
25    buf_writer: BufWriter<Box<dyn DynFile>>,
26    _mark: PhantomData<T>,
27}
28
29impl<T> Logger<T>
30where
31    T: Encode,
32{
33    pub(crate) async fn new(option: Options) -> Result<Self, LogError> {
34        let fs = option.fs_option.parse()?;
35        let file = fs
36            .open_options(
37                &option.path,
38                OpenOptions::default()
39                    .read(true)
40                    .write(true)
41                    .create(true)
42                    .truncate(option.truncate),
43            )
44            .await?;
45
46        let buf_writer = BufWriter::new(file, option.buf_size);
47        Ok(Self {
48            fs,
49            buf_writer,
50            path: option.path,
51            _mark: PhantomData,
52        })
53    }
54
55    pub(crate) async fn with_fs(fs: Arc<dyn DynFs>, option: Options) -> Result<Self, LogError> {
56        let file = fs
57            .open_options(
58                &option.path,
59                OpenOptions::default()
60                    .read(true)
61                    .write(true)
62                    .create(true)
63                    .truncate(option.truncate),
64            )
65            .await?;
66
67        let buf_writer = BufWriter::new(file, option.buf_size);
68        Ok(Self {
69            fs,
70            buf_writer,
71            path: option.path,
72            _mark: PhantomData,
73        })
74    }
75}
76
77impl<T> Logger<T>
78where
79    T: Encode,
80{
81    /// Write a batch of log entries to the log file.
82    pub async fn write_batch<'r>(
83        &mut self,
84        data: impl ExactSizeIterator<Item = &'r T>,
85    ) -> Result<(), LogError>
86    where
87        T: 'r,
88    {
89        let mut writer = HashWriter::new(&mut self.buf_writer);
90        (data.len() as u32).encode(&mut writer).await?;
91        for e in data {
92            e.encode(&mut writer)
93                .await
94                .map_err(|err| LogError::Encode {
95                    message: err.to_string(),
96                })?;
97        }
98        writer.eol().await?;
99        Ok(())
100    }
101
102    /// Write a single log entry to the log file. This method has the same behavior as `write_batch`
103    /// but for a single entry.
104    pub async fn write(&mut self, data: &T) -> Result<(), LogError> {
105        let mut writer = HashWriter::new(&mut self.buf_writer);
106
107        1_u32.encode(&mut writer).await.unwrap();
108        data.encode(&mut writer)
109            .await
110            .map_err(|err| LogError::Encode {
111                message: err.to_string(),
112            })?;
113        writer.eol().await?;
114        Ok(())
115    }
116
117    /// Flush the data to the log file.
118    pub async fn flush(&mut self) -> Result<(), LogError> {
119        self.buf_writer.flush().await?;
120        Ok(())
121    }
122
123    /// Close the log file. This will flush the data to the log file and close it.
124    /// It is not guaranteed that the log file can be operated after closing.
125    pub async fn close(&mut self) -> Result<(), LogError> {
126        self.buf_writer.close().await?;
127        Ok(())
128    }
129}
130
131impl<T> Logger<T>
132where
133    T: Decode,
134{
135    pub(crate) async fn recover(
136        option: Options,
137    ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
138        let fs = option.fs_option.parse()?;
139        let file = BufReader::new(
140            fs.open_options(&option.path, OpenOptions::default().create(false))
141                .await?,
142            DEFAULT_BUF_SIZE,
143        )
144        .await?;
145
146        Ok(Box::pin(stream::try_unfold(
147            (file, 0),
148            |(mut f, mut pos)| async move {
149                let mut cursor = Cursor::new(&mut f);
150                cursor.set_position(pos);
151                let mut reader = HashReader::new(cursor);
152
153                let Ok(len) = u32::decode(&mut reader).await else {
154                    return Ok(None);
155                };
156                let mut buf = Vec::with_capacity(len as usize);
157                for _ in 0..len {
158                    match T::decode(&mut reader).await {
159                        Ok(record) => {
160                            buf.push(record);
161                        }
162                        Err(err) => {
163                            return Err(LogError::Decode {
164                                message: err.to_string(),
165                            });
166                        }
167                    }
168                }
169
170                pos += reader.position();
171                if !reader.checksum().await? {
172                    return Err(LogError::Checksum);
173                }
174                pos += size_of::<u32>() as u64;
175
176                Ok(Some((buf, (f, pos))))
177            },
178        )))
179    }
180}
181
182impl<T> Logger<T> {
183    /// Remove log file
184    pub async fn remove(self) -> Result<(), LogError> {
185        self.fs.remove(&self.path).await?;
186        Ok(())
187    }
188}
189
190#[cfg(test)]
191mod tests {
192
193    use std::pin::pin;
194
195    use futures_util::{StreamExt, TryStreamExt};
196    use tempfile::TempDir;
197
198    use crate::{
199        fs::{AwsCredential, SeqRead, Write},
200        Decode, Encode, FsOptions, Options, Path,
201    };
202
203    #[derive(Debug, Clone)]
204    struct TestStruct {
205        id: u64,
206        name: String,
207        email: Option<String>,
208    }
209
210    impl Encode for TestStruct {
211        type Error = fusio::Error;
212
213        async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
214        where
215            W: Write,
216        {
217            self.id.encode(writer).await?;
218            self.name.encode(writer).await.unwrap();
219            self.email.encode(writer).await.unwrap();
220            Ok(())
221        }
222
223        fn size(&self) -> usize {
224            self.id.size() + self.name.size() + self.email.size()
225        }
226    }
227
228    impl Decode for TestStruct {
229        type Error = fusio::Error;
230
231        async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
232        where
233            R: SeqRead,
234        {
235            let id = u64::decode(reader).await?;
236            let name = String::decode(reader).await.unwrap();
237            let email = Option::<String>::decode(reader).await.unwrap();
238            Ok(Self { id, name, email })
239        }
240    }
241
242    fn test_items() -> Vec<TestStruct> {
243        let mut items = vec![];
244        for i in 0..50 {
245            items.push(TestStruct {
246                id: i,
247                name: format!("Tonbo{}", i),
248                email: Some(format!("fusio{}@tonboio.com", i)),
249            });
250        }
251        items
252    }
253
254    async fn write_u8() {
255        let temp_dir = TempDir::new().unwrap();
256        let path = Path::from_filesystem_path(temp_dir.path())
257            .unwrap()
258            .child("u8");
259
260        {
261            let mut logger = Options::new(path.clone()).build::<u8>().await.unwrap();
262            logger.write(&1).await.unwrap();
263            logger.write_batch([2, 3, 4].iter()).await.unwrap();
264            logger
265                .write_batch([2, 3, 4, 5, 1, 255].iter())
266                .await
267                .unwrap();
268            logger.flush().await.unwrap();
269            logger.close().await.unwrap();
270        }
271        {
272            let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
273            let stream = Options::new(path)
274                .recover::<u8>()
275                .await
276                .unwrap()
277                .into_stream();
278            let mut stream = pin!(stream);
279            let mut i = 0;
280            while let Some(res) = stream.next().await {
281                assert!(res.is_ok());
282                assert_eq!(&expected[i], &res.unwrap());
283                i += 1;
284            }
285            assert_eq!(i, expected.len())
286        }
287    }
288
289    #[cfg(feature = "tokio")]
290    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
291    async fn test_tokio_write_u8() {
292        write_u8().await;
293    }
294
295    #[cfg(feature = "monoio")]
296    #[monoio::test]
297    async fn test_tokio_write_u8() {
298        write_u8().await;
299    }
300
301    async fn write_struct() {
302        let temp_dir = TempDir::new().unwrap();
303        let path = Path::from_filesystem_path(temp_dir.path())
304            .unwrap()
305            .child("struct");
306
307        {
308            let mut logger = Options::new(path.clone())
309                .build::<TestStruct>()
310                .await
311                .unwrap();
312            logger
313                .write(&TestStruct {
314                    id: 100,
315                    name: "Name".to_string(),
316                    email: None,
317                })
318                .await
319                .unwrap();
320            logger.write_batch(test_items().iter()).await.unwrap();
321            logger.flush().await.unwrap();
322            logger.close().await.unwrap();
323        }
324        {
325            let expected = [
326                &[TestStruct {
327                    id: 100,
328                    name: "Name".to_string(),
329                    email: None,
330                }],
331                &test_items()[0..],
332            ];
333            let stream = Options::new(path)
334                .recover::<TestStruct>()
335                .await
336                .unwrap()
337                .into_stream();
338            let mut stream = pin!(stream);
339            let mut i = 0;
340            while let Some(res) = stream.next().await {
341                assert!(res.is_ok());
342                for (left, right) in expected[i].iter().zip(res.unwrap()) {
343                    assert_eq!(left.id, right.id);
344                    assert_eq!(left.email, right.email);
345                }
346                i += 1;
347            }
348            assert_eq!(i, expected.len())
349        }
350    }
351
352    #[cfg(feature = "tokio")]
353    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
354    async fn test_tokio_write_struct() {
355        write_struct().await
356    }
357
358    #[cfg(feature = "monoio")]
359    #[monoio::test]
360    async fn test_monoio_write_struct() {
361        write_struct().await
362    }
363
364    async fn write_s3() {
365        let path = Path::from_url_path("log").unwrap();
366
367        if option_env!("AWS_ACCESS_KEY_ID").is_none()
368            || option_env!("AWS_SECRET_ACCESS_KEY").is_none()
369        {
370            eprintln!("can not get `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`");
371            return;
372        }
373        let key_id = std::option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string();
374        let secret_key = std::option_env!("AWS_SECRET_ACCESS_KEY")
375            .unwrap()
376            .to_string();
377
378        let option = Options::new(path).truncate(true).fs(FsOptions::S3 {
379            bucket: "fusio-test".to_string(),
380            credential: Some(AwsCredential {
381                key_id,
382                secret_key,
383                token: None,
384            }),
385            endpoint: None,
386            region: Some("ap-southeast-1".to_string()),
387            sign_payload: None,
388            checksum: None,
389        });
390
391        {
392            let mut logger = option.clone().build::<TestStruct>().await.unwrap();
393            logger
394                .write(&TestStruct {
395                    id: 100,
396                    name: "Name".to_string(),
397                    email: None,
398                })
399                .await
400                .unwrap();
401            logger.write_batch(test_items().iter()).await.unwrap();
402            logger.flush().await.unwrap();
403            logger.close().await.unwrap();
404        }
405        {
406            let expected = [
407                &[TestStruct {
408                    id: 100,
409                    name: "Name".to_string(),
410                    email: None,
411                }],
412                &test_items()[0..],
413            ];
414            let stream = option.recover::<TestStruct>().await.unwrap().into_stream();
415            let mut stream = pin!(stream);
416            let mut i = 0;
417            while let Some(res) = stream.next().await {
418                assert!(res.is_ok());
419                for (left, right) in expected[i].iter().zip(res.unwrap()) {
420                    assert_eq!(left.id, right.id);
421                    assert_eq!(left.email, right.email);
422                }
423                i += 1;
424            }
425        }
426    }
427
428    #[ignore = "s3"]
429    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
430    async fn test_tokio_write_s3() {
431        write_s3().await;
432    }
433
434    #[ignore = "s3"]
435    #[cfg(feature = "monoio")]
436    #[monoio::test(enable_timer = true)]
437    async fn test_monoio_write_s3() {
438        write_s3().await;
439    }
440
441    async fn recover_empty() {
442        let temp_dir = TempDir::new().unwrap();
443        let path = Path::from_filesystem_path(temp_dir.path())
444            .unwrap()
445            .child("empty");
446
447        {
448            let mut logger = Options::new(path.clone())
449                .build::<TestStruct>()
450                .await
451                .unwrap();
452            logger.flush().await.unwrap();
453            logger.close().await.unwrap();
454        }
455        {
456            let mut stream = Options::new(path).recover::<TestStruct>().await.unwrap();
457            let res = stream.try_next().await.unwrap();
458            assert!(res.is_none());
459        }
460    }
461
462    #[cfg(feature = "tokio")]
463    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
464    async fn test_recover_empty() {
465        recover_empty().await;
466    }
467
468    #[cfg(feature = "monoio")]
469    #[monoio::test]
470    async fn test_recover_empty() {
471        recover_empty().await;
472    }
473
474    async fn disable_buffer() {
475        let temp_dir = TempDir::new().unwrap();
476        let path = Path::from_filesystem_path(temp_dir.path())
477            .unwrap()
478            .child("disable_buf");
479
480        {
481            let mut logger = Options::new(path.clone())
482                .disable_buf()
483                .build::<u8>()
484                .await
485                .unwrap();
486            logger.write(&1).await.unwrap();
487            logger.write_batch([2, 3, 4].iter()).await.unwrap();
488            logger
489                .write_batch([2, 3, 4, 5, 1, 255].iter())
490                .await
491                .unwrap();
492            logger.flush().await.unwrap();
493            logger.close().await.unwrap();
494        }
495        {
496            let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
497            let mut stream = Options::new(path)
498                .recover::<u8>()
499                .await
500                .unwrap()
501                .into_stream();
502            let mut stream = pin!(stream);
503            let mut i = 0;
504            while let Some(res) = stream.next().await {
505                assert!(res.is_ok());
506                assert_eq!(&expected[i], &res.unwrap());
507                i += 1;
508            }
509            assert_eq!(i, expected.len())
510        }
511    }
512
513    #[cfg(feature = "tokio")]
514    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
515    async fn test_disable_buffer() {
516        disable_buffer().await;
517    }
518
519    #[cfg(feature = "monoio")]
520    #[monoio::test]
521    async fn test_disable_buffer() {
522        disable_buffer().await;
523    }
524}