1pub mod error;
2pub mod fs;
3mod option;
4mod serdes;
5use std::{io::Cursor, marker::PhantomData, sync::Arc};
6
7use error::LogError;
8use fs::hash::{HashReader, HashWriter};
9pub use fusio::path::Path;
10use fusio::{buffered::BufWriter, dynamic::DynFile, fs::OpenOptions, DynFs, DynWrite};
11use futures_core::TryStream;
12use futures_util::stream;
13#[allow(unused)]
14pub use option::*;
15pub use serdes::*;
16
17pub struct Logger<T> {
18 path: Path,
19 fs: Arc<dyn DynFs>,
20 buf_writer: BufWriter<Box<dyn DynFile>>,
21 _mark: PhantomData<T>,
22}
23
24impl<T> Logger<T>
25where
26 T: Encode,
27{
28 pub(crate) async fn new(option: Options) -> Result<Self, LogError> {
29 let fs = option.fs_option.parse()?;
30 let file = fs
31 .open_options(
32 &option.path,
33 OpenOptions::default()
34 .read(true)
35 .write(true)
36 .create(true)
37 .truncate(option.truncate),
38 )
39 .await?;
40
41 let buf_writer = BufWriter::new(file, option.buf_size);
42 Ok(Self {
43 fs,
44 buf_writer,
45 path: option.path,
46 _mark: PhantomData,
47 })
48 }
49
50 pub(crate) async fn with_fs(fs: Arc<dyn DynFs>, option: Options) -> Result<Self, LogError> {
51 let file = fs
52 .open_options(
53 &option.path,
54 OpenOptions::default()
55 .read(true)
56 .write(true)
57 .create(true)
58 .truncate(option.truncate),
59 )
60 .await?;
61
62 let buf_writer = BufWriter::new(file, option.buf_size);
63 Ok(Self {
64 fs,
65 buf_writer,
66 path: option.path,
67 _mark: PhantomData,
68 })
69 }
70}
71
72impl<T> Logger<T>
73where
74 T: Encode,
75{
76 pub async fn write_batch<'r>(
77 &mut self,
78 data: impl ExactSizeIterator<Item = &'r T>,
79 ) -> Result<(), LogError>
80 where
81 T: 'r,
82 {
83 let mut writer = HashWriter::new(&mut self.buf_writer);
84 (data.len() as u32).encode(&mut writer).await?;
85 for e in data {
86 e.encode(&mut writer)
87 .await
88 .map_err(|err| LogError::Encode {
89 message: err.to_string(),
90 })?;
91 }
92 writer.eol().await?;
93 Ok(())
94 }
95
96 pub async fn write(&mut self, data: &T) -> Result<(), LogError> {
97 let mut writer = HashWriter::new(&mut self.buf_writer);
98
99 1_u32.encode(&mut writer).await.unwrap();
100 data.encode(&mut writer)
101 .await
102 .map_err(|err| LogError::Encode {
103 message: err.to_string(),
104 })?;
105 writer.eol().await?;
106 Ok(())
107 }
108
109 pub async fn flush(&mut self) -> Result<(), LogError> {
110 self.buf_writer.flush().await?;
111 Ok(())
112 }
113
114 pub async fn close(&mut self) -> Result<(), LogError> {
115 self.buf_writer.close().await?;
116 Ok(())
117 }
118}
119
120impl<T> Logger<T>
121where
122 T: Decode,
123{
124 pub(crate) async fn recover(
125 option: Options,
126 ) -> Result<impl TryStream<Ok = Vec<T>, Error = LogError> + Unpin, LogError> {
127 let fs = option.fs_option.parse()?;
128 let file = fs
129 .open_options(&option.path, OpenOptions::default().create(false))
130 .await?;
131
132 Ok(Box::pin(stream::try_unfold(
133 (file, 0),
134 |(mut f, mut pos)| async move {
135 let mut cursor = Cursor::new(&mut f);
136 cursor.set_position(pos);
137 let mut reader = HashReader::new(cursor);
138
139 let Ok(len) = u32::decode(&mut reader).await else {
140 f.close().await?;
141 return Ok(None);
142 };
143 let mut buf = Vec::with_capacity(len as usize);
144 for _ in 0..len {
145 match T::decode(&mut reader).await {
146 Ok(record) => {
147 buf.push(record);
148 }
149 Err(err) => {
150 return Err(LogError::Decode {
151 message: err.to_string(),
152 });
153 }
154 }
155 }
156
157 pos += reader.position();
158 if !reader.checksum().await? {
159 return Err(LogError::Checksum);
160 }
161 pos += size_of::<u32>() as u64;
162
163 Ok(Some((buf, (f, pos))))
164 },
165 )))
166 }
167}
168
169impl<T> Logger<T> {
170 pub async fn remove(self) -> Result<(), LogError> {
172 self.fs.remove(&self.path).await?;
173 Ok(())
174 }
175}
176
177#[cfg(test)]
178mod tests {
179
180 use futures_util::{StreamExt, TryStreamExt};
181 use tempfile::TempDir;
182 use tokio::pin;
183
184 use crate::{
185 fs::{AwsCredential, SeqRead, Write},
186 Decode, Encode, FsOptions, Options, Path,
187 };
188
189 #[derive(Debug, Clone)]
190 struct TestStruct {
191 id: u64,
192 name: String,
193 email: Option<String>,
194 }
195
196 impl Encode for TestStruct {
197 type Error = fusio::Error;
198
199 async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
200 where
201 W: Write,
202 {
203 self.id.encode(writer).await?;
204 self.name.encode(writer).await.unwrap();
205 self.email.encode(writer).await.unwrap();
206 Ok(())
207 }
208
209 fn size(&self) -> usize {
210 self.id.size() + self.name.size() + self.email.size()
211 }
212 }
213
214 impl Decode for TestStruct {
215 type Error = fusio::Error;
216
217 async fn decode<R>(reader: &mut R) -> Result<Self, Self::Error>
218 where
219 R: SeqRead,
220 {
221 let id = u64::decode(reader).await?;
222 let name = String::decode(reader).await.unwrap();
223 let email = Option::<String>::decode(reader).await.unwrap();
224 Ok(Self { id, name, email })
225 }
226 }
227
228 fn test_items() -> Vec<TestStruct> {
229 let mut items = vec![];
230 for i in 0..50 {
231 items.push(TestStruct {
232 id: i,
233 name: format!("Tonbo{}", i),
234 email: Some(format!("fusio{}@tonboio.com", i)),
235 });
236 }
237 items
238 }
239
240 #[tokio::test]
241 async fn test_write_u8() {
242 let temp_dir = TempDir::new().unwrap();
243 let path = Path::from_filesystem_path(temp_dir.path())
244 .unwrap()
245 .child("u8");
246
247 {
248 let mut logger = Options::new(path.clone()).build::<u8>().await.unwrap();
249 logger.write(&1).await.unwrap();
250 logger.write_batch([2, 3, 4].iter()).await.unwrap();
251 logger
252 .write_batch([2, 3, 4, 5, 1, 255].iter())
253 .await
254 .unwrap();
255 logger.flush().await.unwrap();
256 logger.close().await.unwrap();
257 }
258 {
259 let expected = [vec![1], vec![2, 3, 4], vec![2, 3, 4, 5, 1, 255]];
260 let stream = Options::new(path)
261 .recover::<u8>()
262 .await
263 .unwrap()
264 .into_stream();
265 pin!(stream);
266 let mut i = 0;
267 while let Some(res) = stream.next().await {
268 assert!(res.is_ok());
269 assert_eq!(&expected[i], &res.unwrap());
270 i += 1;
271 }
272 assert_eq!(i, expected.len())
273 }
274 }
275
276 #[tokio::test]
277 async fn test_write_struct() {
278 let temp_dir = TempDir::new().unwrap();
279 let path = Path::from_filesystem_path(temp_dir.path())
280 .unwrap()
281 .child("struct");
282
283 {
284 let mut logger = Options::new(path.clone())
285 .build::<TestStruct>()
286 .await
287 .unwrap();
288 logger
289 .write(&TestStruct {
290 id: 100,
291 name: "Name".to_string(),
292 email: None,
293 })
294 .await
295 .unwrap();
296 logger.write_batch(test_items().iter()).await.unwrap();
297 logger.flush().await.unwrap();
298 logger.close().await.unwrap();
299 }
300 {
301 let expected = [
302 &[TestStruct {
303 id: 100,
304 name: "Name".to_string(),
305 email: None,
306 }],
307 &test_items()[0..],
308 ];
309 let stream = Options::new(path)
310 .recover::<TestStruct>()
311 .await
312 .unwrap()
313 .into_stream();
314 pin!(stream);
315 let mut i = 0;
316 while let Some(res) = stream.next().await {
317 assert!(res.is_ok());
318 for (left, right) in expected[i].iter().zip(res.unwrap()) {
319 assert_eq!(left.id, right.id);
320 assert_eq!(left.email, right.email);
321 }
322 i += 1;
323 }
324 assert_eq!(i, expected.len())
325 }
326 }
327
328 #[ignore = "s3"]
329 #[tokio::test]
330 async fn test_write_s3() {
331 let path = Path::from_url_path("log").unwrap();
332 let option = Options::new(path).fs(FsOptions::S3 {
333 bucket: "data".to_string(),
334 credential: Some(AwsCredential {
335 key_id: "key_id".to_string(),
336 secret_key: "secret_key".to_string(),
337 token: None,
338 }),
339 endpoint: None,
340 region: Some("region".to_string()),
341 sign_payload: None,
342 checksum: None,
343 });
344
345 {
346 let mut logger = option.clone().build::<TestStruct>().await.unwrap();
347 logger
348 .write(&TestStruct {
349 id: 100,
350 name: "Name".to_string(),
351 email: None,
352 })
353 .await
354 .unwrap();
355 logger.write_batch(test_items().iter()).await.unwrap();
356 logger.flush().await.unwrap();
357 logger.close().await.unwrap();
358 }
359 {
360 let expected = [
361 &[TestStruct {
362 id: 100,
363 name: "Name".to_string(),
364 email: None,
365 }],
366 &test_items()[0..],
367 ];
368 let stream = option.recover::<TestStruct>().await.unwrap().into_stream();
369 pin!(stream);
370 let mut i = 0;
371 while let Some(res) = stream.next().await {
372 assert!(res.is_ok());
373 for (left, right) in expected[i].iter().zip(res.unwrap()) {
374 assert_eq!(left.id, right.id);
375 assert_eq!(left.email, right.email);
376 }
377 i += 1;
378 }
379 }
380 }
381
382 #[tokio::test]
383 async fn test_recover_empty() {
384 let temp_dir = TempDir::new().unwrap();
385 let path = Path::from_filesystem_path(temp_dir.path())
386 .unwrap()
387 .child("empty");
388
389 {
390 let mut logger = Options::new(path.clone())
391 .build::<TestStruct>()
392 .await
393 .unwrap();
394 logger.flush().await.unwrap();
395 logger.close().await.unwrap();
396 }
397 {
398 let mut stream = Options::new(path).recover::<TestStruct>().await.unwrap();
399 let res = stream.try_next().await.unwrap();
400 assert!(res.is_none());
401 }
402 }
403}