1pub mod error;
2pub mod fs;
3mod option;
4mod serdes;
5use std::{io::Cursor, marker::PhantomData, sync::Arc};
6
7use error::LogError;
8use fs::hash::{HashReader, HashWriter};
9pub use fusio::path::Path;
10use fusio::{
11 buffered::{BufReader, BufWriter},
12 dynamic::DynFile,
13 fs::OpenOptions,
14 DynFs, DynWrite,
15};
16use futures_core::TryStream;
17use futures_util::stream;
18#[allow(unused)]
19pub use option::*;
20pub use serdes::*;
21
22pub struct Logger<T> {
23 path: Path,
24 fs: Arc<dyn DynFs>,
25 buf_writer: BufWriter<Box<dyn DynFile>>,
26 _mark: PhantomData<T>,
27}
28
29impl<T> Logger<T>
30where
31 T: Encode,
32{
33 pub(crate) async fn new(option: Options) -> Result<Self, LogError> {
34 let fs = option.fs_option.parse()?;
35 let file = fs
36 .open_options(
37 &option.path,
38 OpenOptions::default()
39 .read(true)
40 .write(true)
41 .create(true)
42 .truncate(option.truncate),
43 )
44 .await?;
45
46 let buf_writer = BufWriter::new(file, option.buf_size);
47 Ok(Self {
48 fs,
49 buf_writer,
50 path: option.path,
51 _mark: PhantomData,
52 })
53 }
54
55 pub(crate) async fn with_fs(fs: Arc<dyn DynFs>, option: Options) -> Result<Self, LogError> {
56 let file = fs
57 .open_options(
58 &option.path,
59 OpenOptions::default()
60 .read(true)
61 .write(true)
62 .create(true)
63 .truncate(option.truncate),
64 )
65 .await?;
66
67 let buf_writer = BufWriter::new(file, option.buf_size);
68 Ok(Self {
69 fs,
70 buf_writer,
71 path: option.path,
72 _mark: PhantomData,
73 })
74 }
75}
76
77impl<T> Logger<T>
78where
79 T: Encode,
80{
81 pub async fn write_batch<'r>(
83 &mut self,
84 data: impl ExactSizeIterator<Item = &'r T>,
85 ) -> Result<(), LogError>
86 where
87 T: 'r,
88 {
89 let mut writer = HashWriter::new(&mut self.buf_writer);
90 (data.len() as u32).encode(&mut writer).await?;
91 for e in data {
92 e.encode(&mut writer)
93 .await
94 .map_err(|err| LogError::Encode {
95 message: err.to_string(),
96 })?;
97 }
98 writer.eol().await?;
99 Ok(())
100 }
101
102 pub async fn write(&mut self, data: &T) -> Result<(), LogError> {
105 let mut writer = HashWriter::new(&mut self.buf_writer);
106
107 1_u32.encode(&mut writer).await.unwrap();
108 data.encode(&mut writer)
109 .await
110 .map_err(|err| LogError::Encode {
111 message: err.to_string(),
112 })?;
113 writer.eol().await?;
114 Ok(())
115 }
116
117 pub async fn flush(&mut self) -> Result<(), LogError> {
119 self.buf_writer.flush().await?;
120 Ok(())
121 }
122
123 pub async fn close(&mut self) -> Result<(), LogError> {
126 self.buf_writer.close().await?;
127 Ok(())
128 }
129}
130
131impl<T> Logger<T>
132where
133 T: Decode,
134{
135 pub(crate) async fn recover(
136 option: Options,
137 ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
138 let fs = option.fs_option.parse()?;
139 let file = BufReader::new(
140 fs.open_options(&option.path, OpenOptions::default().create(false))
141 .await?,
142 DEFAULT_BUF_SIZE,
143 )
144 .await?;
145
146 Ok(Box::pin(stream::try_unfold(
147 (file, 0),
148 |(mut f, mut pos)| async move {
149 let mut cursor = Cursor::new(&mut f);
150 cursor.set_position(pos);
151 let mut reader = HashReader::new(cursor);
152
153 let Ok(len) = u32::decode(&mut reader).await else {
154 return Ok(None);
155 };
156 let mut buf = Vec::with_capacity(len as usize);
157 for _ in 0..len {
158 match T::decode(&mut reader).await {
159 Ok(record) => {
160 buf.push(record);
161 }
162 Err(err) => {
163 return Err(LogError::Decode {
164 message: err.to_string(),
165 });
166 }
167 }
168 }
169
170 pos += reader.position();
171 if !reader.checksum().await? {
172 return Err(LogError::Checksum);
173 }
174 pos += size_of::<u32>() as u64;
175
176 Ok(Some((buf, (f, pos))))
177 },
178 )))
179 }
180}
181
182impl<T> Logger<T> {
183 pub async fn remove(self) -> Result<(), LogError> {
185 self.fs.remove(&self.path).await?;
186 Ok(())
187 }
188}
189
190#[cfg(test)]
191mod tests {
192
193 use std::pin::pin;
194
195 use futures_util::{StreamExt, TryStreamExt};
196 use tempfile::TempDir;
197
198 use crate::{
199 fs::{AwsCredential, SeqRead, Write},
200 Decode, Encode, FsOptions, Options, Path,
201 };
202
203 #[derive(Debug, Clone)]
204 struct TestStruct {
205 id: u64,
206 name: String,
207 email: Option<String>,
208 }
209
210 impl Encode for TestStruct {
211 type Error = fusio::Error;
212
213 async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
214 where
215 W: Write,
216 {
217 self.id.encode(writer).await?;
218 self.name.encode(writer).await.unwrap();
219 self.email.encode(writer).await.unwrap();
220 Ok(())
221 }
222
223 fn size(&self) -> usize {
224 self.id.size() + self.name.size() + self.email.size()
225 }
226 }
227
228 impl Decode for TestStruct {
229 type Error = fusio::Error;
230
231 async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
232 where
233 R: SeqRead,
234 {
235 let id = u64::decode(reader).await?;
236 let name = String::decode(reader).await.unwrap();
237 let email = Option::<String>::decode(reader).await.unwrap();
238 Ok(Self { id, name, email })
239 }
240 }
241
242 fn test_items() -> Vec<TestStruct> {
243 let mut items = vec![];
244 for i in 0..50 {
245 items.push(TestStruct {
246 id: i,
247 name: format!("Tonbo{}", i),
248 email: Some(format!("fusio{}@tonboio.com", i)),
249 });
250 }
251 items
252 }
253
254 async fn write_u8() {
255 let temp_dir = TempDir::new().unwrap();
256 let path = Path::from_filesystem_path(temp_dir.path())
257 .unwrap()
258 .child("u8");
259
260 {
261 let mut logger = Options::new(path.clone()).build::<u8>().await.unwrap();
262 logger.write(&1).await.unwrap();
263 logger.write_batch([2, 3, 4].iter()).await.unwrap();
264 logger
265 .write_batch([2, 3, 4, 5, 1, 255].iter())
266 .await
267 .unwrap();
268 logger.flush().await.unwrap();
269 logger.close().await.unwrap();
270 }
271 {
272 let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
273 let stream = Options::new(path)
274 .recover::<u8>()
275 .await
276 .unwrap()
277 .into_stream();
278 let mut stream = pin!(stream);
279 let mut i = 0;
280 while let Some(res) = stream.next().await {
281 assert!(res.is_ok());
282 assert_eq!(&expected[i], &res.unwrap());
283 i += 1;
284 }
285 assert_eq!(i, expected.len())
286 }
287 }
288
289 #[cfg(feature = "tokio")]
290 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
291 async fn test_tokio_write_u8() {
292 write_u8().await;
293 }
294
295 #[cfg(feature = "monoio")]
296 #[monoio::test]
297 async fn test_tokio_write_u8() {
298 write_u8().await;
299 }
300
301 async fn write_struct() {
302 let temp_dir = TempDir::new().unwrap();
303 let path = Path::from_filesystem_path(temp_dir.path())
304 .unwrap()
305 .child("struct");
306
307 {
308 let mut logger = Options::new(path.clone())
309 .build::<TestStruct>()
310 .await
311 .unwrap();
312 logger
313 .write(&TestStruct {
314 id: 100,
315 name: "Name".to_string(),
316 email: None,
317 })
318 .await
319 .unwrap();
320 logger.write_batch(test_items().iter()).await.unwrap();
321 logger.flush().await.unwrap();
322 logger.close().await.unwrap();
323 }
324 {
325 let expected = [
326 &[TestStruct {
327 id: 100,
328 name: "Name".to_string(),
329 email: None,
330 }],
331 &test_items()[0..],
332 ];
333 let stream = Options::new(path)
334 .recover::<TestStruct>()
335 .await
336 .unwrap()
337 .into_stream();
338 let mut stream = pin!(stream);
339 let mut i = 0;
340 while let Some(res) = stream.next().await {
341 assert!(res.is_ok());
342 for (left, right) in expected[i].iter().zip(res.unwrap()) {
343 assert_eq!(left.id, right.id);
344 assert_eq!(left.email, right.email);
345 }
346 i += 1;
347 }
348 assert_eq!(i, expected.len())
349 }
350 }
351
352 #[cfg(feature = "tokio")]
353 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
354 async fn test_tokio_write_struct() {
355 write_struct().await
356 }
357
358 #[cfg(feature = "monoio")]
359 #[monoio::test]
360 async fn test_monoio_write_struct() {
361 write_struct().await
362 }
363
364 async fn write_s3() {
365 let path = Path::from_url_path("log").unwrap();
366
367 if option_env!("AWS_ACCESS_KEY_ID").is_none()
368 || option_env!("AWS_SECRET_ACCESS_KEY").is_none()
369 {
370 eprintln!("can not get `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`");
371 return;
372 }
373 let key_id = std::option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string();
374 let secret_key = std::option_env!("AWS_SECRET_ACCESS_KEY")
375 .unwrap()
376 .to_string();
377
378 let option = Options::new(path).truncate(true).fs(FsOptions::S3 {
379 bucket: "fusio-test".to_string(),
380 credential: Some(AwsCredential {
381 key_id,
382 secret_key,
383 token: None,
384 }),
385 endpoint: None,
386 region: Some("ap-southeast-1".to_string()),
387 sign_payload: None,
388 checksum: None,
389 });
390
391 {
392 let mut logger = option.clone().build::<TestStruct>().await.unwrap();
393 logger
394 .write(&TestStruct {
395 id: 100,
396 name: "Name".to_string(),
397 email: None,
398 })
399 .await
400 .unwrap();
401 logger.write_batch(test_items().iter()).await.unwrap();
402 logger.flush().await.unwrap();
403 logger.close().await.unwrap();
404 }
405 {
406 let expected = [
407 &[TestStruct {
408 id: 100,
409 name: "Name".to_string(),
410 email: None,
411 }],
412 &test_items()[0..],
413 ];
414 let stream = option.recover::<TestStruct>().await.unwrap().into_stream();
415 let mut stream = pin!(stream);
416 let mut i = 0;
417 while let Some(res) = stream.next().await {
418 assert!(res.is_ok());
419 for (left, right) in expected[i].iter().zip(res.unwrap()) {
420 assert_eq!(left.id, right.id);
421 assert_eq!(left.email, right.email);
422 }
423 i += 1;
424 }
425 }
426 }
427
428 #[ignore = "s3"]
429 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
430 async fn test_tokio_write_s3() {
431 write_s3().await;
432 }
433
434 #[ignore = "s3"]
435 #[cfg(feature = "monoio")]
436 #[monoio::test(enable_timer = true)]
437 async fn test_monoio_write_s3() {
438 write_s3().await;
439 }
440
441 async fn recover_empty() {
442 let temp_dir = TempDir::new().unwrap();
443 let path = Path::from_filesystem_path(temp_dir.path())
444 .unwrap()
445 .child("empty");
446
447 {
448 let mut logger = Options::new(path.clone())
449 .build::<TestStruct>()
450 .await
451 .unwrap();
452 logger.flush().await.unwrap();
453 logger.close().await.unwrap();
454 }
455 {
456 let mut stream = Options::new(path).recover::<TestStruct>().await.unwrap();
457 let res = stream.try_next().await.unwrap();
458 assert!(res.is_none());
459 }
460 }
461
462 #[cfg(feature = "tokio")]
463 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
464 async fn test_recover_empty() {
465 recover_empty().await;
466 }
467
468 #[cfg(feature = "monoio")]
469 #[monoio::test]
470 async fn test_recover_empty() {
471 recover_empty().await;
472 }
473
474 async fn disable_buffer() {
475 let temp_dir = TempDir::new().unwrap();
476 let path = Path::from_filesystem_path(temp_dir.path())
477 .unwrap()
478 .child("disable_buf");
479
480 {
481 let mut logger = Options::new(path.clone())
482 .disable_buf()
483 .build::<u8>()
484 .await
485 .unwrap();
486 logger.write(&1).await.unwrap();
487 logger.write_batch([2, 3, 4].iter()).await.unwrap();
488 logger
489 .write_batch([2, 3, 4, 5, 1, 255].iter())
490 .await
491 .unwrap();
492 logger.flush().await.unwrap();
493 logger.close().await.unwrap();
494 }
495 {
496 let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
497 let mut stream = Options::new(path)
498 .recover::<u8>()
499 .await
500 .unwrap()
501 .into_stream();
502 let mut stream = pin!(stream);
503 let mut i = 0;
504 while let Some(res) = stream.next().await {
505 assert!(res.is_ok());
506 assert_eq!(&expected[i], &res.unwrap());
507 i += 1;
508 }
509 assert_eq!(i, expected.len())
510 }
511 }
512
513 #[cfg(feature = "tokio")]
514 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
515 async fn test_disable_buffer() {
516 disable_buffer().await;
517 }
518
519 #[cfg(feature = "monoio")]
520 #[monoio::test]
521 async fn test_disable_buffer() {
522 disable_buffer().await;
523 }
524}