1pub mod error;
2pub mod fs;
3mod option;
4mod serdes;
5use std::{io::Cursor, marker::PhantomData, sync::Arc};
6
7use error::LogError;
8use fs::hash::{HashReader, HashWriter};
9pub use fusio::path::Path;
10use fusio::{buffered::BufWriter, dynamic::DynFile, fs::OpenOptions, DynFs, DynWrite};
11use futures_core::TryStream;
12use futures_util::stream;
13#[allow(unused)]
14pub use option::*;
15pub use serdes::*;
16
17pub struct Logger<T> {
18 path: Path,
19 fs: Arc<dyn DynFs>,
20 buf_writer: BufWriter<Box<dyn DynFile>>,
21 _mark: PhantomData<T>,
22}
23
24impl<T> Logger<T>
25where
26 T: Encode,
27{
28 pub(crate) async fn new(option: Options) -> Result<Self, LogError> {
29 let fs = option.fs_option.parse()?;
30 let file = fs
31 .open_options(
32 &option.path,
33 OpenOptions::default()
34 .read(true)
35 .write(true)
36 .create(true)
37 .truncate(option.truncate),
38 )
39 .await?;
40
41 let buf_writer = BufWriter::new(file, option.buf_size);
42 Ok(Self {
43 fs,
44 buf_writer,
45 path: option.path,
46 _mark: PhantomData,
47 })
48 }
49
50 pub(crate) async fn with_fs(fs: Arc<dyn DynFs>, option: Options) -> Result<Self, LogError> {
51 let file = fs
52 .open_options(
53 &option.path,
54 OpenOptions::default().read(true).write(true).create(true),
55 )
56 .await?;
57
58 let buf_writer = BufWriter::new(file, option.buf_size);
59 Ok(Self {
60 fs,
61 buf_writer,
62 path: option.path,
63 _mark: PhantomData,
64 })
65 }
66}
67
68impl<T> Logger<T>
69where
70 T: Encode,
71{
72 pub async fn write_batch<'r>(
73 &mut self,
74 data: impl ExactSizeIterator<Item = &'r T>,
75 ) -> Result<(), LogError>
76 where
77 T: 'r,
78 {
79 let mut writer = HashWriter::new(&mut self.buf_writer);
80 (data.len() as u32).encode(&mut writer).await?;
81 for e in data {
82 e.encode(&mut writer)
83 .await
84 .map_err(|err| LogError::Encode {
85 message: err.to_string(),
86 })?;
87 }
88 writer.eol().await?;
89 Ok(())
90 }
91
92 pub async fn write(&mut self, data: &T) -> Result<(), LogError> {
93 let mut writer = HashWriter::new(&mut self.buf_writer);
94
95 1_u32.encode(&mut writer).await.unwrap();
96 data.encode(&mut writer)
97 .await
98 .map_err(|err| LogError::Encode {
99 message: err.to_string(),
100 })?;
101 writer.eol().await?;
102 Ok(())
103 }
104
105 pub async fn flush(&mut self) -> Result<(), LogError> {
106 self.buf_writer.flush().await?;
107 Ok(())
108 }
109
110 pub async fn close(&mut self) -> Result<(), LogError> {
111 self.buf_writer.close().await?;
112 Ok(())
113 }
114}
115
116impl<T> Logger<T>
117where
118 T: Decode,
119{
120 pub(crate) async fn recover(
121 option: Options,
122 ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
123 let fs = option.fs_option.parse()?;
124 let file = fs
125 .open_options(&option.path, OpenOptions::default().create(false))
126 .await?;
127
128 Ok(Box::pin(stream::try_unfold(
129 (file, 0),
130 |(mut f, mut pos)| async move {
131 let mut cursor = Cursor::new(&mut f);
132 cursor.set_position(pos);
133 let mut reader = HashReader::new(cursor);
134
135 let Ok(len) = u32::decode(&mut reader).await else {
136 f.close().await?;
137 return Ok(None);
138 };
139 let mut buf = Vec::with_capacity(len as usize);
140 for _ in 0..len {
141 match T::decode(&mut reader).await {
142 Ok(record) => {
143 buf.push(record);
144 }
145 Err(err) => {
146 return Err(LogError::Decode {
147 message: err.to_string(),
148 });
149 }
150 }
151 }
152
153 pos += reader.position();
154 if !reader.checksum().await? {
155 return Err(LogError::Checksum);
156 }
157 pos += size_of::<u32>() as u64;
158
159 Ok(Some((buf, (f, pos))))
160 },
161 )))
162 }
163}
164
165impl<T> Logger<T> {
166 pub async fn remove(self) -> Result<(), LogError> {
168 self.fs.remove(&self.path).await?;
169 Ok(())
170 }
171}
172
173#[cfg(test)]
174mod tests {
175
176 use futures_util::{StreamExt, TryStreamExt};
177 use tempfile::TempDir;
178 use tokio::pin;
179
180 use crate::{
181 fs::{AwsCredential, SeqRead, Write},
182 Decode, Encode, FsOptions, Options, Path,
183 };
184
185 #[derive(Debug, Clone)]
186 struct TestStruct {
187 id: u64,
188 name: String,
189 email: Option<String>,
190 }
191
192 impl Encode for TestStruct {
193 type Error = fusio::Error;
194
195 async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
196 where
197 W: Write,
198 {
199 self.id.encode(writer).await?;
200 self.name.encode(writer).await.unwrap();
201 self.email.encode(writer).await.unwrap();
202 Ok(())
203 }
204
205 fn size(&self) -> usize {
206 self.id.size() + self.name.size() + self.email.size()
207 }
208 }
209
210 impl Decode for TestStruct {
211 type Error = fusio::Error;
212
213 async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
214 where
215 R: SeqRead,
216 {
217 let id = u64::decode(reader).await?;
218 let name = String::decode(reader).await.unwrap();
219 let email = Option::<String>::decode(reader).await.unwrap();
220 Ok(Self { id, name, email })
221 }
222 }
223
224 fn test_items() -> Vec<TestStruct> {
225 let mut items = vec![];
226 for i in 0..50 {
227 items.push(TestStruct {
228 id: i,
229 name: format!("Tonbo{}", i),
230 email: Some(format!("fusio{}@tonboio.com", i)),
231 });
232 }
233 items
234 }
235
236 #[tokio::test]
237 async fn test_write_u8() {
238 let temp_dir = TempDir::new().unwrap();
239 let path = Path::from_filesystem_path(temp_dir.path())
240 .unwrap()
241 .child("u8");
242
243 {
244 let mut logger = Options::new(path.clone()).build::<u8>().await.unwrap();
245 logger.write(&1).await.unwrap();
246 logger.write_batch([2, 3, 4].iter()).await.unwrap();
247 logger
248 .write_batch([2, 3, 4, 5, 1, 255].iter())
249 .await
250 .unwrap();
251 logger.flush().await.unwrap();
252 logger.close().await.unwrap();
253 }
254 {
255 let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
256 let stream = Options::new(path)
257 .recover::<u8>()
258 .await
259 .unwrap()
260 .into_stream();
261 pin!(stream);
262 let mut i = 0;
263 while let Some(res) = stream.next().await {
264 assert!(res.is_ok());
265 assert_eq!(&expected[i], &res.unwrap());
266 i += 1;
267 }
268 assert_eq!(i, expected.len())
269 }
270 }
271
272 #[tokio::test]
273 async fn test_write_struct() {
274 let temp_dir = TempDir::new().unwrap();
275 let path = Path::from_filesystem_path(temp_dir.path())
276 .unwrap()
277 .child("struct");
278
279 {
280 let mut logger = Options::new(path.clone())
281 .build::<TestStruct>()
282 .await
283 .unwrap();
284 logger
285 .write(&TestStruct {
286 id: 100,
287 name: "Name".to_string(),
288 email: None,
289 })
290 .await
291 .unwrap();
292 logger.write_batch(test_items().iter()).await.unwrap();
293 logger.flush().await.unwrap();
294 logger.close().await.unwrap();
295 }
296 {
297 let expected = [
298 &[TestStruct {
299 id: 100,
300 name: "Name".to_string(),
301 email: None,
302 }],
303 &test_items()[0..],
304 ];
305 let stream = Options::new(path)
306 .recover::<TestStruct>()
307 .await
308 .unwrap()
309 .into_stream();
310 pin!(stream);
311 let mut i = 0;
312 while let Some(res) = stream.next().await {
313 assert!(res.is_ok());
314 for (left, right) in expected[i].iter().zip(res.unwrap()) {
315 assert_eq!(left.id, right.id);
316 assert_eq!(left.email, right.email);
317 }
318 i += 1;
319 }
320 assert_eq!(i, expected.len())
321 }
322 }
323
324 #[ignore = "s3"]
325 #[tokio::test]
326 async fn test_write_s3() {
327 let path = Path::from_url_path("log").unwrap();
328 let option = Options::new(path).fs(FsOptions::S3 {
329 bucket: "data".to_string(),
330 credential: Some(AwsCredential {
331 key_id: "key_id".to_string(),
332 secret_key: "secret_key".to_string(),
333 token: None,
334 }),
335 endpoint: None,
336 region: Some("region".to_string()),
337 sign_payload: None,
338 checksum: None,
339 });
340
341 {
342 let mut logger = option.clone().build::<TestStruct>().await.unwrap();
343 logger
344 .write(&TestStruct {
345 id: 100,
346 name: "Name".to_string(),
347 email: None,
348 })
349 .await
350 .unwrap();
351 logger.write_batch(test_items().iter()).await.unwrap();
352 logger.flush().await.unwrap();
353 logger.close().await.unwrap();
354 }
355 {
356 let expected = [
357 &[TestStruct {
358 id: 100,
359 name: "Name".to_string(),
360 email: None,
361 }],
362 &test_items()[0..],
363 ];
364 let stream = option.recover::<TestStruct>().await.unwrap().into_stream();
365 pin!(stream);
366 let mut i = 0;
367 while let Some(res) = stream.next().await {
368 assert!(res.is_ok());
369 for (left, right) in expected[i].iter().zip(res.unwrap()) {
370 assert_eq!(left.id, right.id);
371 assert_eq!(left.email, right.email);
372 }
373 i += 1;
374 }
375 }
376 }
377
378 #[tokio::test]
379 async fn test_recover_empty() {
380 let temp_dir = TempDir::new().unwrap();
381 let path = Path::from_filesystem_path(temp_dir.path())
382 .unwrap()
383 .child("empty");
384
385 {
386 let mut logger = Options::new(path.clone())
387 .build::<TestStruct>()
388 .await
389 .unwrap();
390 logger.flush().await.unwrap();
391 logger.close().await.unwrap();
392 }
393 {
394 let mut stream = Options::new(path).recover::<TestStruct>().await.unwrap();
395 let res = stream.try_next().await.unwrap();
396 assert!(res.is_none());
397 }
398 }
399}