1use std::{io::Cursor, marker::PhantomData, sync::Arc};
2
3use fusio::{
4 buffered::{BufReader, BufWriter},
5 dynamic::DynFile,
6 fs::OpenOptions,
7 path::Path,
8 DynFs, DynWrite,
9};
10use futures_core::TryStream;
11use futures_util::stream;
12
13use crate::{
14 error::LogError,
15 fs::hash::{HashReader, HashWriter},
16 Decode, Encode, Options, DEFAULT_BUF_SIZE,
17};
18
19pub struct Logger<T> {
20 path: Path,
21 fs: Arc<dyn DynFs>,
22 buf_writer: BufWriter<Box<dyn DynFile>>,
23 _mark: PhantomData<T>,
24}
25
26impl<T> Logger<T>
27where
28 T: Encode,
29{
30 pub(crate) async fn new(option: Options) -> Result<Self, LogError> {
31 let fs = option.fs_option.parse()?;
32 let file = fs
33 .open_options(
34 &option.path,
35 OpenOptions::default()
36 .read(true)
37 .write(true)
38 .create(true)
39 .truncate(option.truncate),
40 )
41 .await?;
42
43 let buf_writer = BufWriter::new(file, option.buf_size);
44 Ok(Self {
45 fs,
46 buf_writer,
47 path: option.path,
48 _mark: PhantomData,
49 })
50 }
51
52 pub(crate) async fn with_fs(fs: Arc<dyn DynFs>, option: Options) -> Result<Self, LogError> {
53 let file = fs
54 .open_options(
55 &option.path,
56 OpenOptions::default()
57 .read(true)
58 .write(true)
59 .create(true)
60 .truncate(option.truncate),
61 )
62 .await?;
63
64 let buf_writer = BufWriter::new(file, option.buf_size);
65 Ok(Self {
66 fs,
67 buf_writer,
68 path: option.path,
69 _mark: PhantomData,
70 })
71 }
72}
73
74impl<T> Logger<T>
75where
76 T: Encode,
77{
78 pub async fn write_batch<'r>(
80 &mut self,
81 data: impl ExactSizeIterator<Item = &'r T>,
82 ) -> Result<(), LogError>
83 where
84 T: 'r,
85 {
86 let mut writer = HashWriter::new(&mut self.buf_writer);
87 (data.len() as u32).encode(&mut writer).await?;
88 for e in data {
89 e.encode(&mut writer)
90 .await
91 .map_err(|err| LogError::Encode {
92 message: err.to_string(),
93 })?;
94 }
95 writer.eol().await?;
96 Ok(())
97 }
98
99 pub async fn write(&mut self, data: &T) -> Result<(), LogError> {
102 let mut writer = HashWriter::new(&mut self.buf_writer);
103
104 1_u32.encode(&mut writer).await.unwrap();
105 data.encode(&mut writer)
106 .await
107 .map_err(|err| LogError::Encode {
108 message: err.to_string(),
109 })?;
110 writer.eol().await?;
111 Ok(())
112 }
113
114 pub async fn flush(&mut self) -> Result<(), LogError> {
116 self.buf_writer.flush().await?;
117 Ok(())
118 }
119
120 pub async fn close(&mut self) -> Result<(), LogError> {
123 self.buf_writer.close().await?;
124 Ok(())
125 }
126}
127
128impl<T> Logger<T>
129where
130 T: Decode,
131{
132 pub(crate) async fn recover(
133 option: Options,
134 ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
135 let fs = option.fs_option.parse()?;
136 Self::recover_with_fs(&option.path, fs).await
137 }
138
139 pub(crate) async fn recover_with_fs(
140 path: &Path,
141 fs: Arc<dyn DynFs>,
142 ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
143 let file = BufReader::new(
144 fs.open_options(&path, OpenOptions::default().create(false))
145 .await?,
146 DEFAULT_BUF_SIZE,
147 )
148 .await?;
149
150 Ok(Box::pin(stream::try_unfold(
151 (file, 0),
152 |(mut f, mut pos)| async move {
153 let mut cursor = Cursor::new(&mut f);
154 cursor.set_position(pos);
155 let mut reader = HashReader::new(cursor);
156
157 let Ok(len) = u32::decode(&mut reader).await else {
158 return Ok(None);
159 };
160 let mut buf = Vec::with_capacity(len as usize);
161 for _ in 0..len {
162 match T::decode(&mut reader).await {
163 Ok(record) => {
164 buf.push(record);
165 }
166 Err(err) => {
167 return Err(LogError::Decode {
168 message: err.to_string(),
169 });
170 }
171 }
172 }
173
174 pos += reader.position();
175 if !reader.checksum().await? {
176 return Err(LogError::Checksum);
177 }
178 pos += size_of::<u32>() as u64;
179
180 Ok(Some((buf, (f, pos))))
181 },
182 )))
183 }
184}
185
186impl<T> Logger<T> {
187 pub async fn remove(self) -> Result<(), LogError> {
189 self.fs.remove(&self.path).await?;
190 Ok(())
191 }
192}
193
194#[cfg(test)]
195mod tests {
196
197 use std::pin::pin;
198
199 use fusio::Error;
200 use futures_util::{StreamExt, TryStreamExt};
201 use tempfile::TempDir;
202
203 use crate::{
204 fs::{AwsCredential, SeqRead, Write},
205 Decode, Encode, FsOptions, Options, Path,
206 };
207
208 #[derive(Debug, Clone)]
209 struct TestStruct {
210 id: u64,
211 name: String,
212 email: Option<String>,
213 }
214
215 impl Encode for TestStruct {
216 async fn encode<W>(&self, writer: &mut W) -> Result<(), Error>
217 where
218 W: Write,
219 {
220 self.id.encode(writer).await?;
221 self.name.encode(writer).await.unwrap();
222 self.email.encode(writer).await.unwrap();
223 Ok(())
224 }
225
226 fn size(&self) -> usize {
227 self.id.size() + self.name.size() + self.email.size()
228 }
229 }
230
231 impl Decode for TestStruct {
232 async fn decode<R>(reader: &mut R) -> Result<Self, Error>
233 where
234 R: SeqRead,
235 {
236 let id = u64::decode(reader).await?;
237 let name = String::decode(reader).await.unwrap();
238 let email = Option::<String>::decode(reader).await.unwrap();
239 Ok(Self { id, name, email })
240 }
241 }
242
243 fn test_items() -> Vec<TestStruct> {
244 let mut items = vec![];
245 for i in 0..50 {
246 items.push(TestStruct {
247 id: i,
248 name: format!("Tonbo{}", i),
249 email: Some(format!("fusio{}@tonboio.com", i)),
250 });
251 }
252 items
253 }
254
255 async fn write_u8() {
256 let temp_dir = TempDir::new().unwrap();
257 let path = Path::from_filesystem_path(temp_dir.path())
258 .unwrap()
259 .child("u8");
260
261 {
262 let mut logger = Options::new(path.clone()).build::<u8>().await.unwrap();
263 logger.write(&1).await.unwrap();
264 logger.write_batch([2, 3, 4].iter()).await.unwrap();
265 logger
266 .write_batch([2, 3, 4, 5, 1, 255].iter())
267 .await
268 .unwrap();
269 logger.flush().await.unwrap();
270 logger.close().await.unwrap();
271 }
272 {
273 let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
274 let stream = Options::new(path)
275 .recover::<u8>()
276 .await
277 .unwrap()
278 .into_stream();
279 let mut stream = pin!(stream);
280 let mut i = 0;
281 while let Some(res) = stream.next().await {
282 assert!(res.is_ok());
283 assert_eq!(&expected[i], &res.unwrap());
284 i += 1;
285 }
286 assert_eq!(i, expected.len())
287 }
288 }
289
290 #[cfg(feature = "tokio")]
291 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
292 async fn test_tokio_write_u8() {
293 write_u8().await;
294 }
295
296 #[cfg(feature = "monoio")]
297 #[monoio::test]
298 async fn test_tokio_write_u8() {
299 write_u8().await;
300 }
301
302 async fn write_struct() {
303 let temp_dir = TempDir::new().unwrap();
304 let path = Path::from_filesystem_path(temp_dir.path())
305 .unwrap()
306 .child("struct");
307
308 {
309 let mut logger = Options::new(path.clone())
310 .build::<TestStruct>()
311 .await
312 .unwrap();
313 logger
314 .write(&TestStruct {
315 id: 100,
316 name: "Name".to_string(),
317 email: None,
318 })
319 .await
320 .unwrap();
321 logger.write_batch(test_items().iter()).await.unwrap();
322 logger.flush().await.unwrap();
323 logger.close().await.unwrap();
324 }
325 {
326 let expected = [
327 &[TestStruct {
328 id: 100,
329 name: "Name".to_string(),
330 email: None,
331 }],
332 &test_items()[0..],
333 ];
334 let stream = Options::new(path)
335 .recover::<TestStruct>()
336 .await
337 .unwrap()
338 .into_stream();
339 let mut stream = pin!(stream);
340 let mut i = 0;
341 while let Some(res) = stream.next().await {
342 assert!(res.is_ok());
343 for (left, right) in expected[i].iter().zip(res.unwrap()) {
344 assert_eq!(left.id, right.id);
345 assert_eq!(left.email, right.email);
346 }
347 i += 1;
348 }
349 assert_eq!(i, expected.len())
350 }
351 }
352
353 #[cfg(feature = "tokio")]
354 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
355 async fn test_tokio_write_struct() {
356 write_struct().await
357 }
358
359 #[cfg(feature = "monoio")]
360 #[monoio::test]
361 async fn test_monoio_write_struct() {
362 write_struct().await
363 }
364
365 async fn write_s3() {
366 let path = Path::from_url_path("log").unwrap();
367
368 if option_env!("AWS_ACCESS_KEY_ID").is_none()
369 || option_env!("AWS_SECRET_ACCESS_KEY").is_none()
370 {
371 eprintln!("can not get `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`");
372 return;
373 }
374 let key_id = std::option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string();
375 let secret_key = std::option_env!("AWS_SECRET_ACCESS_KEY")
376 .unwrap()
377 .to_string();
378 let bucket = std::option_env!("BUCKET_NAME")
379 .expect("expected bucket not to be empty")
380 .to_string();
381 let region = std::option_env!("AWS_REGION")
382 .expect("expected region not to be empty")
383 .to_string();
384 let token = std::option_env!("AWS_SESSION_TOKEN").map(|v| v.to_string());
385
386 let option = Options::new(path).truncate(true).fs(FsOptions::S3 {
387 bucket,
388 credential: Some(AwsCredential {
389 key_id,
390 secret_key,
391 token,
392 }),
393 endpoint: None,
394 region: Some(region),
395 sign_payload: None,
396 checksum: None,
397 });
398
399 {
400 let mut logger = option.clone().build::<TestStruct>().await.unwrap();
401 logger
402 .write(&TestStruct {
403 id: 100,
404 name: "Name".to_string(),
405 email: None,
406 })
407 .await
408 .unwrap();
409 logger.write_batch(test_items().iter()).await.unwrap();
410 logger.flush().await.unwrap();
411 logger.close().await.unwrap();
412 }
413 {
414 let expected = [
415 &[TestStruct {
416 id: 100,
417 name: "Name".to_string(),
418 email: None,
419 }],
420 &test_items()[0..],
421 ];
422 let stream = option.recover::<TestStruct>().await.unwrap().into_stream();
423 let mut stream = pin!(stream);
424 let mut i = 0;
425 while let Some(res) = stream.next().await {
426 assert!(res.is_ok());
427 for (left, right) in expected[i].iter().zip(res.unwrap()) {
428 assert_eq!(left.id, right.id);
429 assert_eq!(left.email, right.email);
430 }
431 i += 1;
432 }
433 }
434 }
435
436 #[cfg(all(feature = "tokio-http", feature = "aws"))]
437 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
438 async fn test_tokio_write_s3() {
439 write_s3().await;
440 }
441
442 #[ignore = "s3"]
443 #[cfg(all(feature = "monoio-http", feature = "aws"))]
444 #[monoio::test(enable_timer = true)]
445 async fn test_monoio_write_s3() {
446 write_s3().await;
447 }
448
449 async fn recover_empty() {
450 let temp_dir = TempDir::new().unwrap();
451 let path = Path::from_filesystem_path(temp_dir.path())
452 .unwrap()
453 .child("empty");
454
455 {
456 let mut logger = Options::new(path.clone())
457 .build::<TestStruct>()
458 .await
459 .unwrap();
460 logger.flush().await.unwrap();
461 logger.close().await.unwrap();
462 }
463 {
464 let mut stream = Options::new(path).recover::<TestStruct>().await.unwrap();
465 let res = stream.try_next().await.unwrap();
466 assert!(res.is_none());
467 }
468 }
469
470 #[cfg(feature = "tokio")]
471 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
472 async fn test_recover_empty() {
473 recover_empty().await;
474 }
475
476 #[cfg(feature = "monoio")]
477 #[monoio::test]
478 async fn test_recover_empty() {
479 recover_empty().await;
480 }
481
482 async fn disable_buffer() {
483 let temp_dir = TempDir::new().unwrap();
484 let path = Path::from_filesystem_path(temp_dir.path())
485 .unwrap()
486 .child("disable_buf");
487
488 {
489 let mut logger = Options::new(path.clone())
490 .disable_buf()
491 .build::<u8>()
492 .await
493 .unwrap();
494 logger.write(&1).await.unwrap();
495 logger.write_batch([2, 3, 4].iter()).await.unwrap();
496 logger
497 .write_batch([2, 3, 4, 5, 1, 255].iter())
498 .await
499 .unwrap();
500 logger.flush().await.unwrap();
501 logger.close().await.unwrap();
502 }
503 {
504 let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
505 let mut stream = Options::new(path)
506 .recover::<u8>()
507 .await
508 .unwrap()
509 .into_stream();
510 let mut stream = pin!(stream);
511 let mut i = 0;
512 while let Some(res) = stream.next().await {
513 assert!(res.is_ok());
514 assert_eq!(&expected[i], &res.unwrap());
515 i += 1;
516 }
517 assert_eq!(i, expected.len())
518 }
519 }
520
521 #[cfg(feature = "tokio")]
522 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
523 async fn test_disable_buffer() {
524 disable_buffer().await;
525 }
526
527 #[cfg(feature = "monoio")]
528 #[monoio::test]
529 async fn test_disable_buffer() {
530 disable_buffer().await;
531 }
532}