fusio_log/
log.rs

1use std::{io::Cursor, marker::PhantomData, sync::Arc};
2
3use fusio::{
4    buffered::{BufReader, BufWriter},
5    dynamic::DynFile,
6    fs::OpenOptions,
7    path::Path,
8    DynFs, DynWrite,
9};
10use futures_core::TryStream;
11use futures_util::stream;
12
13use crate::{
14    error::LogError,
15    fs::hash::{HashReader, HashWriter},
16    Decode, Encode, Options, DEFAULT_BUF_SIZE,
17};
18
19pub struct Logger<T> {
20    path: Path,
21    fs: Arc<dyn DynFs>,
22    buf_writer: BufWriter<Box<dyn DynFile>>,
23    _mark: PhantomData<T>,
24}
25
26impl<T> Logger<T>
27where
28    T: Encode,
29{
30    pub(crate) async fn new(option: Options) -> Result<Self, LogError> {
31        let fs = option.fs_option.parse()?;
32        let file = fs
33            .open_options(
34                &option.path,
35                OpenOptions::default()
36                    .read(true)
37                    .write(true)
38                    .create(true)
39                    .truncate(option.truncate),
40            )
41            .await?;
42
43        let buf_writer = BufWriter::new(file, option.buf_size);
44        Ok(Self {
45            fs,
46            buf_writer,
47            path: option.path,
48            _mark: PhantomData,
49        })
50    }
51
52    pub(crate) async fn with_fs(fs: Arc<dyn DynFs>, option: Options) -> Result<Self, LogError> {
53        let file = fs
54            .open_options(
55                &option.path,
56                OpenOptions::default()
57                    .read(true)
58                    .write(true)
59                    .create(true)
60                    .truncate(option.truncate),
61            )
62            .await?;
63
64        let buf_writer = BufWriter::new(file, option.buf_size);
65        Ok(Self {
66            fs,
67            buf_writer,
68            path: option.path,
69            _mark: PhantomData,
70        })
71    }
72}
73
74impl<T> Logger<T>
75where
76    T: Encode,
77{
78    /// Write a batch of log entries to the log file.
79    pub async fn write_batch<'r>(
80        &mut self,
81        data: impl ExactSizeIterator<Item = &'r T>,
82    ) -> Result<(), LogError>
83    where
84        T: 'r,
85    {
86        let mut writer = HashWriter::new(&mut self.buf_writer);
87        (data.len() as u32).encode(&mut writer).await?;
88        for e in data {
89            e.encode(&mut writer)
90                .await
91                .map_err(|err| LogError::Encode {
92                    message: err.to_string(),
93                })?;
94        }
95        writer.eol().await?;
96        Ok(())
97    }
98
99    /// Write a single log entry to the log file. This method has the same behavior as `write_batch`
100    /// but for a single entry.
101    pub async fn write(&mut self, data: &T) -> Result<(), LogError> {
102        let mut writer = HashWriter::new(&mut self.buf_writer);
103
104        1_u32.encode(&mut writer).await.unwrap();
105        data.encode(&mut writer)
106            .await
107            .map_err(|err| LogError::Encode {
108                message: err.to_string(),
109            })?;
110        writer.eol().await?;
111        Ok(())
112    }
113
114    /// Flush the data to the log file.
115    pub async fn flush(&mut self) -> Result<(), LogError> {
116        self.buf_writer.flush().await?;
117        Ok(())
118    }
119
120    /// Close the log file. This will flush the data to the log file and close it.
121    /// It is not guaranteed that the log file can be operated after closing.
122    pub async fn close(&mut self) -> Result<(), LogError> {
123        self.buf_writer.close().await?;
124        Ok(())
125    }
126}
127
128impl<T> Logger<T>
129where
130    T: Decode,
131{
132    pub(crate) async fn recover(
133        option: Options,
134    ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
135        let fs = option.fs_option.parse()?;
136        Self::recover_with_fs(&option.path, fs).await
137    }
138
139    pub(crate) async fn recover_with_fs(
140        path: &Path,
141        fs: Arc<dyn DynFs>,
142    ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
143        let file = BufReader::new(
144            fs.open_options(&path, OpenOptions::default().create(false))
145                .await?,
146            DEFAULT_BUF_SIZE,
147        )
148        .await?;
149
150        Ok(Box::pin(stream::try_unfold(
151            (file, 0),
152            |(mut f, mut pos)| async move {
153                let mut cursor = Cursor::new(&mut f);
154                cursor.set_position(pos);
155                let mut reader = HashReader::new(cursor);
156
157                let Ok(len) = u32::decode(&mut reader).await else {
158                    return Ok(None);
159                };
160                let mut buf = Vec::with_capacity(len as usize);
161                for _ in 0..len {
162                    match T::decode(&mut reader).await {
163                        Ok(record) => {
164                            buf.push(record);
165                        }
166                        Err(err) => {
167                            return Err(LogError::Decode {
168                                message: err.to_string(),
169                            });
170                        }
171                    }
172                }
173
174                pos += reader.position();
175                if !reader.checksum().await? {
176                    return Err(LogError::Checksum);
177                }
178                pos += size_of::<u32>() as u64;
179
180                Ok(Some((buf, (f, pos))))
181            },
182        )))
183    }
184}
185
186impl<T> Logger<T> {
187    /// Remove log file
188    pub async fn remove(self) -> Result<(), LogError> {
189        self.fs.remove(&self.path).await?;
190        Ok(())
191    }
192}
193
194#[cfg(test)]
195mod tests {
196
197    use std::pin::pin;
198
199    use fusio::Error;
200    use futures_util::{StreamExt, TryStreamExt};
201    use tempfile::TempDir;
202
203    use crate::{
204        fs::{AwsCredential, SeqRead, Write},
205        Decode, Encode, FsOptions, Options, Path,
206    };
207
208    #[derive(Debug, Clone)]
209    struct TestStruct {
210        id: u64,
211        name: String,
212        email: Option<String>,
213    }
214
215    impl Encode for TestStruct {
216        async fn encode<W>(&self, writer: &mut W) -> Result<(), Error>
217        where
218            W: Write,
219        {
220            self.id.encode(writer).await?;
221            self.name.encode(writer).await.unwrap();
222            self.email.encode(writer).await.unwrap();
223            Ok(())
224        }
225
226        fn size(&self) -> usize {
227            self.id.size() + self.name.size() + self.email.size()
228        }
229    }
230
231    impl Decode for TestStruct {
232        async fn decode<R>(reader: &mut R) -> Result<Self, Error>
233        where
234            R: SeqRead,
235        {
236            let id = u64::decode(reader).await?;
237            let name = String::decode(reader).await.unwrap();
238            let email = Option::<String>::decode(reader).await.unwrap();
239            Ok(Self { id, name, email })
240        }
241    }
242
243    fn test_items() -> Vec<TestStruct> {
244        let mut items = vec![];
245        for i in 0..50 {
246            items.push(TestStruct {
247                id: i,
248                name: format!("Tonbo{}", i),
249                email: Some(format!("fusio{}@tonboio.com", i)),
250            });
251        }
252        items
253    }
254
255    async fn write_u8() {
256        let temp_dir = TempDir::new().unwrap();
257        let path = Path::from_filesystem_path(temp_dir.path())
258            .unwrap()
259            .child("u8");
260
261        {
262            let mut logger = Options::new(path.clone()).build::<u8>().await.unwrap();
263            logger.write(&1).await.unwrap();
264            logger.write_batch([2, 3, 4].iter()).await.unwrap();
265            logger
266                .write_batch([2, 3, 4, 5, 1, 255].iter())
267                .await
268                .unwrap();
269            logger.flush().await.unwrap();
270            logger.close().await.unwrap();
271        }
272        {
273            let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
274            let stream = Options::new(path)
275                .recover::<u8>()
276                .await
277                .unwrap()
278                .into_stream();
279            let mut stream = pin!(stream);
280            let mut i = 0;
281            while let Some(res) = stream.next().await {
282                assert!(res.is_ok());
283                assert_eq!(&expected[i], &res.unwrap());
284                i += 1;
285            }
286            assert_eq!(i, expected.len())
287        }
288    }
289
290    #[cfg(feature = "tokio")]
291    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
292    async fn test_tokio_write_u8() {
293        write_u8().await;
294    }
295
296    #[cfg(feature = "monoio")]
297    #[monoio::test]
298    async fn test_tokio_write_u8() {
299        write_u8().await;
300    }
301
302    async fn write_struct() {
303        let temp_dir = TempDir::new().unwrap();
304        let path = Path::from_filesystem_path(temp_dir.path())
305            .unwrap()
306            .child("struct");
307
308        {
309            let mut logger = Options::new(path.clone())
310                .build::<TestStruct>()
311                .await
312                .unwrap();
313            logger
314                .write(&TestStruct {
315                    id: 100,
316                    name: "Name".to_string(),
317                    email: None,
318                })
319                .await
320                .unwrap();
321            logger.write_batch(test_items().iter()).await.unwrap();
322            logger.flush().await.unwrap();
323            logger.close().await.unwrap();
324        }
325        {
326            let expected = [
327                &[TestStruct {
328                    id: 100,
329                    name: "Name".to_string(),
330                    email: None,
331                }],
332                &test_items()[0..],
333            ];
334            let stream = Options::new(path)
335                .recover::<TestStruct>()
336                .await
337                .unwrap()
338                .into_stream();
339            let mut stream = pin!(stream);
340            let mut i = 0;
341            while let Some(res) = stream.next().await {
342                assert!(res.is_ok());
343                for (left, right) in expected[i].iter().zip(res.unwrap()) {
344                    assert_eq!(left.id, right.id);
345                    assert_eq!(left.email, right.email);
346                }
347                i += 1;
348            }
349            assert_eq!(i, expected.len())
350        }
351    }
352
353    #[cfg(feature = "tokio")]
354    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
355    async fn test_tokio_write_struct() {
356        write_struct().await
357    }
358
359    #[cfg(feature = "monoio")]
360    #[monoio::test]
361    async fn test_monoio_write_struct() {
362        write_struct().await
363    }
364
365    async fn write_s3() {
366        let path = Path::from_url_path("log").unwrap();
367
368        if option_env!("AWS_ACCESS_KEY_ID").is_none()
369            || option_env!("AWS_SECRET_ACCESS_KEY").is_none()
370        {
371            eprintln!("can not get `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`");
372            return;
373        }
374        let key_id = std::option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string();
375        let secret_key = std::option_env!("AWS_SECRET_ACCESS_KEY")
376            .unwrap()
377            .to_string();
378        let bucket = std::option_env!("BUCKET_NAME")
379            .expect("expected bucket not to be empty")
380            .to_string();
381        let region = std::option_env!("AWS_REGION")
382            .expect("expected region not to be empty")
383            .to_string();
384        let token = std::option_env!("AWS_SESSION_TOKEN").map(|v| v.to_string());
385
386        let option = Options::new(path).truncate(true).fs(FsOptions::S3 {
387            bucket,
388            credential: Some(AwsCredential {
389                key_id,
390                secret_key,
391                token,
392            }),
393            endpoint: None,
394            region: Some(region),
395            sign_payload: None,
396            checksum: None,
397        });
398
399        {
400            let mut logger = option.clone().build::<TestStruct>().await.unwrap();
401            logger
402                .write(&TestStruct {
403                    id: 100,
404                    name: "Name".to_string(),
405                    email: None,
406                })
407                .await
408                .unwrap();
409            logger.write_batch(test_items().iter()).await.unwrap();
410            logger.flush().await.unwrap();
411            logger.close().await.unwrap();
412        }
413        {
414            let expected = [
415                &[TestStruct {
416                    id: 100,
417                    name: "Name".to_string(),
418                    email: None,
419                }],
420                &test_items()[0..],
421            ];
422            let stream = option.recover::<TestStruct>().await.unwrap().into_stream();
423            let mut stream = pin!(stream);
424            let mut i = 0;
425            while let Some(res) = stream.next().await {
426                assert!(res.is_ok());
427                for (left, right) in expected[i].iter().zip(res.unwrap()) {
428                    assert_eq!(left.id, right.id);
429                    assert_eq!(left.email, right.email);
430                }
431                i += 1;
432            }
433        }
434    }
435
436    #[cfg(all(feature = "tokio-http", feature = "aws"))]
437    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
438    async fn test_tokio_write_s3() {
439        write_s3().await;
440    }
441
442    #[ignore = "s3"]
443    #[cfg(all(feature = "monoio-http", feature = "aws"))]
444    #[monoio::test(enable_timer = true)]
445    async fn test_monoio_write_s3() {
446        write_s3().await;
447    }
448
449    async fn recover_empty() {
450        let temp_dir = TempDir::new().unwrap();
451        let path = Path::from_filesystem_path(temp_dir.path())
452            .unwrap()
453            .child("empty");
454
455        {
456            let mut logger = Options::new(path.clone())
457                .build::<TestStruct>()
458                .await
459                .unwrap();
460            logger.flush().await.unwrap();
461            logger.close().await.unwrap();
462        }
463        {
464            let mut stream = Options::new(path).recover::<TestStruct>().await.unwrap();
465            let res = stream.try_next().await.unwrap();
466            assert!(res.is_none());
467        }
468    }
469
470    #[cfg(feature = "tokio")]
471    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
472    async fn test_recover_empty() {
473        recover_empty().await;
474    }
475
476    #[cfg(feature = "monoio")]
477    #[monoio::test]
478    async fn test_recover_empty() {
479        recover_empty().await;
480    }
481
482    async fn disable_buffer() {
483        let temp_dir = TempDir::new().unwrap();
484        let path = Path::from_filesystem_path(temp_dir.path())
485            .unwrap()
486            .child("disable_buf");
487
488        {
489            let mut logger = Options::new(path.clone())
490                .disable_buf()
491                .build::<u8>()
492                .await
493                .unwrap();
494            logger.write(&1).await.unwrap();
495            logger.write_batch([2, 3, 4].iter()).await.unwrap();
496            logger
497                .write_batch([2, 3, 4, 5, 1, 255].iter())
498                .await
499                .unwrap();
500            logger.flush().await.unwrap();
501            logger.close().await.unwrap();
502        }
503        {
504            let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
505            let mut stream = Options::new(path)
506                .recover::<u8>()
507                .await
508                .unwrap()
509                .into_stream();
510            let mut stream = pin!(stream);
511            let mut i = 0;
512            while let Some(res) = stream.next().await {
513                assert!(res.is_ok());
514                assert_eq!(&expected[i], &res.unwrap());
515                i += 1;
516            }
517            assert_eq!(i, expected.len())
518        }
519    }
520
521    #[cfg(feature = "tokio")]
522    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
523    async fn test_disable_buffer() {
524        disable_buffer().await;
525    }
526
527    #[cfg(feature = "monoio")]
528    #[monoio::test]
529    async fn test_disable_buffer() {
530        disable_buffer().await;
531    }
532}