cyfs_bdt/utils/ndn/
tracked_store.rs

1use log::*;
2use std::{
3    sync::{Arc},
4    path::{Path, PathBuf},
5};
6use async_std::{
7    prelude::*, 
8    fs::{self, OpenOptions}, 
9    io::{SeekFrom, Cursor}, 
10};
11use cyfs_base::*;
12use cyfs_util::*;
13use crate::{ 
14    ndn::*
15};
16
17
18struct StoreImpl {
19    ndc: Box<dyn NamedDataCache>, 
20    tracker: Box<dyn TrackerCache>, 
21}
22
23
24#[derive(Clone)]
25pub struct TrackedChunkStore(Arc<StoreImpl>);
26
27impl TrackedChunkStore {
28    pub fn new(
29        ndc: Box<dyn NamedDataCache>, 
30        tracker: Box<dyn TrackerCache>, 
31    ) -> Self {
32        Self(Arc::new(StoreImpl { 
33            ndc, 
34            tracker, 
35        }))
36    }
37
38    pub async fn track_chunk(&self, chunk: &ChunkId) -> BuckyResult<()> {
39        let request = InsertChunkRequest {
40            chunk_id: chunk.to_owned(),
41            state: ChunkState::Unknown,
42            ref_objects: None,
43            trans_sessions: None,
44            flags: 0,
45        };
46
47        self.ndc().insert_chunk(&request).await.map_err(|e| {
48            error!("record file chunk to ndc error! chunk={}, {}",chunk, e);
49            e
50        })
51    }
52
53    pub async fn track_file(&self, file: &File) -> BuckyResult<()> {
54        let file_id = file.desc().calculate_id();
55        match file.body() {
56            Some(body) => {
57                let chunk_list = body.content().inner_chunk_list();
58                match chunk_list {
59                    Some(chunks) => {
60                        for chunk in chunks {
61                            // 先添加到chunk索引
62                            let ref_obj = ChunkObjectRef {
63                                object_id: file_id.to_owned(),
64                                relation: ChunkObjectRelation::FileBody,
65                            };
66                
67                            let req = InsertChunkRequest {
68                                chunk_id: chunk.to_owned(),
69                                state: ChunkState::Unknown,
70                                ref_objects: Some(vec![ref_obj]),
71                                trans_sessions: None,
72                                flags: 0,
73                            };
74                
75                            self.ndc().insert_chunk(&req).await.map_err(|e| {
76                                error!("record file chunk to ndc error! file={}, chunk={}, {}", file_id, chunk, e);
77                                e
78                            })?;
79
80                            info!("insert chunk of file to ndc, chunk:{}, file:{}", chunk, file_id);
81                        }
82                        Ok(())
83                    }
84                    None => Err(BuckyError::new(
85                        BuckyErrorCode::NotSupport,
86                        format!("file object should has chunk list: {}", file_id),
87                    )),
88                }
89            }
90            None => {
91                Err(BuckyError::new(
92                    BuckyErrorCode::InvalidFormat,
93                    format!("file object should has body: {}", file_id),
94                ))
95            }
96        }
97    }
98
99
100    pub async fn track_file_in_path(
101        &self, 
102        file: File, 
103        path: PathBuf 
104    ) -> BuckyResult<()> {
105        let _ = self.track_file(&file).await?;
106        TrackedChunkListWriter::new(
107            path, 
108            &ChunkListDesc::from_file(&file)?,  
109            self.ndc(), 
110            self.tracker()
111        ).track_path().await
112    }
113
114    fn ndc(&self) -> &dyn NamedDataCache {
115        self.0.ndc.as_ref()
116    }
117
118    fn tracker(&self) -> &dyn TrackerCache {
119        self.0.tracker.as_ref()
120    }
121
122    
123    async fn read_chunk_from_file(chunk: &ChunkId, path: &Path, offset: u64) -> BuckyResult<Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>> {
124        debug!("begin read {} from file {:?}", chunk, path);
125        let mut file = OpenOptions::new()
126            .read(true)
127            .open(path)
128            .await
129            .map_err(|e| {
130                let msg = format!("open file {:?} failed for {}", path, e);
131                error!("{}", msg);
132                BuckyError::new(BuckyErrorCode::IoError, msg)
133            })?;
134
135        let actual_offset = file.seek(SeekFrom::Start(offset)).await.map_err(|e| {
136            let msg = format!("seek file {:?} to offset {} failed for {}", path, offset, e);
137            error!("{}", msg);
138
139            BuckyError::new(BuckyErrorCode::IoError, msg)
140        })?;
141
142        if actual_offset != offset {
143            let msg = format!(
144                "seek file {:?} to offset {} actual offset {}",
145                path, offset, actual_offset
146            );
147            error!("{}", msg);
148
149            return Err(BuckyError::new(BuckyErrorCode::IoError, msg));
150        }
151
152        let mut content = Vec::with_capacity(chunk.len());
153        unsafe { content.set_len(chunk.len()) };
154        file.read_exact(&mut content).await.map_err(|e| {
155            let msg = format!(
156                "read chunk from file {:?} error, chunk={}, len={}, {}",
157                path,
158                chunk,
159                chunk.len(),
160                e
161            );
162            error!("{}", msg);
163
164            BuckyError::new(BuckyErrorCode::IoError, msg)
165        })?;
166
167        let actual_id = ChunkId::calculate(content.as_slice()).await?;
168
169        if actual_id.eq(chunk) {
170            debug!("read {} from file {:?}", chunk, path);
171            Ok(Box::new(Cursor::new(content)))
172        } else {
173            let msg = format!("content in file {:?} not match chunk id", path);
174            error!("{}", msg);
175            Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
176        }
177    }
178
179    async fn is_chunk_stored_in_file(&self, chunk: &ChunkId, path: &Path) -> BuckyResult<bool> {
180        let request = GetTrackerPositionRequest {
181            id: chunk.to_string(),
182            direction: Some(TrackerDirection::Store),
183        };
184        let ret = self.0.tracker.get_position(&request).await?;
185        if ret.len() == 0 {
186            Ok(false)
187        } else {
188            for c in ret {
189                match &c.pos {
190                    TrackerPostion::File(exists) => {
191                        if path.eq(Path::new(exists)) {
192                            return Ok(true);
193                        }
194                    }
195                    TrackerPostion::FileRange(fr) => {
196                        if path.eq(Path::new(&fr.path)) {
197                            return Ok(true);
198                        }
199                    }
200                    _ => {}
201                }
202            }
203            Ok(false)
204        }
205    }
206}
207
208
209#[async_trait::async_trait]
210impl ChunkReader for TrackedChunkStore {
211    fn clone_as_reader(&self) -> Box<dyn ChunkReader> {
212        Box::new(self.clone())
213    }
214
215    async fn exists(&self, chunk: &ChunkId) -> bool {
216        let request = GetChunkRequest {
217            chunk_id: chunk.clone(),
218            flags: 0,
219        };
220        match self.ndc().get_chunk(&request).await {
221            Ok(c) => {
222                if let Some(c) = c {
223                    c.state == ChunkState::Ready
224                } else {
225                    false
226                }
227            }
228            Err(e) => {
229                error!("got chunk state {} from database failed for {}", chunk, e);
230                false
231            }
232        }
233    }
234
235    async fn get(&self, chunk: &ChunkId) -> BuckyResult<Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>> {
236        let request = GetTrackerPositionRequest {
237            id: chunk.to_string(),
238            direction: Some(TrackerDirection::Store),
239        };
240        let ret = self.tracker().get_position(&request).await?;
241        if ret.len() == 0 {
242            Err(BuckyError::new(
243                BuckyErrorCode::NotFound,
244                "chunk not exists",
245            ))
246        } else {
247            for c in ret {
248                let read_ret = match &c.pos {
249                    //FIXME
250                    TrackerPostion::File(path) => {
251                        Self::read_chunk_from_file(chunk, Path::new(path), 0).await
252                    }
253                    TrackerPostion::FileRange(fr) => {
254                        Self::read_chunk_from_file(
255                            chunk,
256                            Path::new(fr.path.as_str()),
257                            fr.range_begin,
258                        )
259                        .await
260                    }
261                    _ => Err(BuckyError::new(
262                        BuckyErrorCode::InvalidFormat,
263                        "unsupport reader",
264                    )),
265                };
266
267                match read_ret {
268                    Ok(reader) => {
269                        return Ok(reader);
270                    }, 
271                    Err(e) => {
272                        // 如果tracker中的pos无法正确读取,从tracker中删除这条记录
273                        let _ = self
274                            .0
275                            .tracker
276                            .remove_position(&RemoveTrackerPositionRequest {
277                                id: chunk.to_string(),
278                                direction: Some(TrackerDirection::Store),
279                                pos: Some(c.pos.clone()),
280                            })
281                            .await;
282                        error!(
283                            "read {} from tracker position {:?} failed for {}",
284                            chunk, c.pos, e
285                        );
286                        continue;
287                    }
288                }
289            }
290
291            error!("read {} from all tracker position failed", chunk);
292            Err(BuckyError::new(
293                BuckyErrorCode::NotFound,
294                "chunk not exists",
295            ))
296        }
297    }
298}
299
300
301struct WriterImpl {
302    path: PathBuf,
303    tmp_path: Option<PathBuf>,
304    chunk: ChunkId,
305    ndc: Box<dyn NamedDataCache>,
306    tracker: Box<dyn TrackerCache>,
307}
308
309#[derive(Clone)]
310pub struct TrackedChunkWriter(Arc<WriterImpl>);
311
312impl std::fmt::Display for TrackedChunkWriter {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        write!(f, "TrackedChunkWriter{{path:{:?}}}", self.path())
315    }
316}
317
318
319impl TrackedChunkWriter {
320    fn from_path(
321        path: &Path,
322        chunk: &ChunkId,
323        ndc: &dyn NamedDataCache,
324        tracker: &dyn TrackerCache,
325    ) -> Self {
326        let tmp_path = format!(
327            "{}-{}",
328            path.file_name().unwrap().to_str().unwrap(),
329            bucky_time_now()
330        );
331        Self::new(
332            path.to_owned(),
333            Some(path.parent().unwrap().join(tmp_path.as_str())),
334            chunk,
335            ndc,
336            tracker,
337        )
338    }
339
340
341    fn new(
342        path: PathBuf,
343        tmp_path: Option<PathBuf>,
344        chunk: &ChunkId,
345        ndc: &dyn NamedDataCache,
346        tracker: &dyn TrackerCache,
347    ) -> Self {
348        Self(Arc::new(WriterImpl {
349            path,
350            tmp_path,
351            chunk: chunk.clone(),
352            ndc: ndc.clone(),
353            tracker: tracker.clone(),
354        }))
355    }
356
357    pub async fn track_path(&self) -> BuckyResult<()> {
358        let request = UpdateChunkStateRequest {
359            chunk_id: self.chunk().clone(),
360            current_state: None,
361            state: ChunkState::Ready,
362        };
363        let _ = self.0.ndc.update_chunk_state(&request).await.map_err(|e| {
364            error!("{} add to tracker failed for {}", self, e);
365            e
366        })?;
367        let request = AddTrackerPositonRequest {
368            id: self.chunk().to_string(),
369            direction: TrackerDirection::Store,
370            pos: TrackerPostion::File(self.path().to_str().unwrap().to_string()),
371            flags: 0,
372        };
373        self.0.tracker.add_position(&request).await.map_err(|e| {
374            error!("{} add to tracker failed for {}", self, e);
375            e
376        })?;
377
378        Ok(())
379    }
380
381    
382    fn path(&self) -> &Path {
383        self.0.path.as_path()
384    }
385
386    fn chunk(&self) -> &ChunkId {
387        &self.0.chunk
388    }
389
390
391    async fn write_inner<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
392        if self.chunk().len() == 0 {
393            return Ok(());
394        }
395
396        let path = self.0.tmp_path.as_ref().map(|p| p.as_path()).unwrap_or(self.path());
397
398        let file = OpenOptions::new().create(true).write(true).open(path).await
399            .map_err(|e| {
400                let msg = format!("{} open file failed for {}", self, e);
401                error!("{}", msg);
402                BuckyError::new(BuckyErrorCode::IoError, msg)
403            })?;
404
405        let _ = async_std::io::copy(reader, file).await
406            .map_err(|e| {
407                let msg = format!(
408                    "{} write chunk file failed for {}",
409                    self, 
410                    e
411                );
412                error!("{}", msg);
413
414                BuckyError::new(BuckyErrorCode::IoError, msg)
415            })?;
416        
417            
418        if self.0.tmp_path.is_some() {
419            let tmp_path = self.0.tmp_path.as_ref().unwrap().as_path();
420            let ret = fs::rename(tmp_path, self.path()).await;
421            if ret.is_err() {
422                if !self.path().exists() {
423                    let msg = format!("{} rename tmp file failed for {}", self, ret.err().unwrap());
424                    error!("{}", msg);
425
426                    return Err(BuckyError::new(BuckyErrorCode::IoError, msg));
427                }
428            }
429        }
430
431        info!("{} writen chunk to file", self);
432
433        self.track_path().await
434    }
435
436    pub async fn write<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
437        if self.chunk().len() == 0 {
438            return Ok(());
439        }
440
441        let ret = self.write_inner(reader).await;
442
443        if self.0.tmp_path.is_some() {
444            let tmp_path = self.0.tmp_path.as_ref().unwrap().as_path();
445            let _ = fs::remove_file(tmp_path).await;
446        }
447        
448        ret
449    }
450}
451
452
453struct ListWriterImpl {
454    path: PathBuf,
455    desc: ChunkListDesc,
456    ndc: Box<dyn NamedDataCache>,
457    tracker: Box<dyn TrackerCache>,
458}
459
460#[derive(Clone)]
461pub struct TrackedChunkListWriter(Arc<ListWriterImpl>);
462
463impl std::fmt::Display for TrackedChunkListWriter {
464    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465        write!(f, "TrackedChunkListWriter{{path:{:?}}}", self.path())
466    }
467}
468
469impl TrackedChunkListWriter {
470    fn new(
471        path: PathBuf, 
472        desc: &ChunkListDesc, 
473        ndc: &dyn NamedDataCache, 
474        tracker: &dyn TrackerCache) -> Self {
475        
476        Self(Arc::new(ListWriterImpl {
477            path, 
478            desc: desc.clone(),  
479            ndc: ndc.clone(), 
480            tracker: tracker.clone(),  
481        }))
482    }
483
484    async fn track_chunk_index(&self, chunk: &ChunkId, index: usize) -> BuckyResult<()> {
485        let offset = self.chunk_list().offset_of(index).unwrap();
486
487        let request = UpdateChunkStateRequest {
488            chunk_id: chunk.clone(),
489            current_state: None,
490            state: ChunkState::Ready,
491        };
492        let _ = self.0.ndc.update_chunk_state(&request).await.map_err(|e| {
493            error!("{} add {} to tracker failed for {}", self, chunk, e);
494            e
495        })?;
496        let request = AddTrackerPositonRequest {
497            id: chunk.to_string(),
498            direction: TrackerDirection::Store,
499            pos: TrackerPostion::FileRange(PostionFileRange {
500                path: self.path().to_str().unwrap().to_string(),
501                range_begin: offset,
502                range_end: offset + chunk.len() as u64,
503            }),
504            flags: 0,
505        };
506        self.0.tracker.add_position(&request).await.map_err(|e| {
507            error!("{} add {} to tracker failed for {}", self, chunk, e);
508            e
509        })?;
510
511        Ok(())
512    }
513
514    pub async fn track_path(&self) -> BuckyResult<()> {
515        for (index, chunk) in self.chunk_list().chunks().iter().enumerate() {
516            let _ = self.track_chunk_index(chunk, index).await?;
517        }
518        Ok(())
519    }
520
521    fn path(&self) -> &Path {
522        self.0.path.as_path()
523    }
524
525    fn chunk_list(&self) -> &ChunkListDesc {
526        &self.0.desc
527    }
528
529    pub async fn write<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
530        // 零长度的chunk不需要触发真正的写入操作
531        if self.chunk_list().total_len() == 0 {
532            return Ok(());
533        }
534
535        let mut reader = reader;
536        let mut file = OpenOptions::new()
537            .create(true)
538            .write(true)
539            .open(self.path())
540            .await
541            .map_err(|e| {
542                let msg = format!("{} open file failed for {}", self, e);
543                error!("{}", msg);
544                BuckyError::new(BuckyErrorCode::IoError, msg)
545            })?;
546
547        // 强制设置为目标大小
548        file.set_len(self.chunk_list().total_len())
549            .await
550            .map_err(|e| {
551                let msg = format!(
552                    "{} create trans data file with len {} failed for {}",
553                    self,
554                    self.chunk_list().total_len(),
555                    e
556                );
557                error!("{}", msg);
558
559                BuckyError::new(BuckyErrorCode::IoError, msg)
560            })?;
561
562        // 强制设置为目标大小
563        file.set_len(self.chunk_list().total_len()).await.map_err(|e| {
564            let msg = format!(
565                "{} create trans data file with len {} failed for {}",
566                self, 
567                self.chunk_list().total_len(),
568                e
569            );
570            error!("{}", msg);
571
572            BuckyError::new(BuckyErrorCode::IoError, msg)
573        })?;
574
575        for (index, chunk) in self.chunk_list().chunks().iter().enumerate() {
576            if chunk.len() == 0 {
577                continue;
578            }
579
580            let mut buffer = vec![0u8; chunk.len()];
581            reader.read_exact(&mut buffer[..]).await?;
582
583            file.write_all(&buffer[..]).await?;
584
585            let _ = self.track_chunk_index(chunk, index).await?;
586        }
587
588        Ok(())
589    }
590}
591
592
593impl TrackedChunkStore {
594    pub async fn chunk_writer(
595        &self,
596        chunk: &ChunkId, 
597        path: PathBuf
598    ) -> BuckyResult<TrackedChunkWriter> {
599        let _ = self.track_chunk(chunk).await?;
600        Ok(TrackedChunkWriter::new(path, None, chunk, self.ndc(), self.tracker()))
601    }
602
603    pub async fn chunk_list_writer(
604        &self,  
605        chunk_list: &ChunkListDesc, 
606        path: PathBuf
607    ) -> BuckyResult<TrackedChunkListWriter> {
608        for chunk in chunk_list.chunks() {
609            let _ = self.track_chunk(chunk).await?;
610        }
611        Ok(TrackedChunkListWriter::new(path, chunk_list, self.ndc(), self.tracker()))
612    }
613
614    pub async fn file_writer(
615        &self,
616        file: &File, 
617        path: PathBuf 
618    ) -> BuckyResult<TrackedChunkListWriter> {
619        let _ = self.track_file(file).await?;
620        Ok(TrackedChunkListWriter::new(path, &ChunkListDesc::from_file(&file)?, self.ndc(), self.tracker()))
621    }
622}