cyfs_bdt/utils/ndn/
local_store.rs

1use log::*;
2use std::{
3    sync::{Arc},
4    path::{Path, PathBuf},
5};
6use async_std::{
7    prelude::*, 
8    fs::{self, OpenOptions},  
9};
10use cyfs_base::*;
11
12use crate::{
13    ndn::*
14};
15
16
17struct WriterImpl {
18    path: PathBuf,
19    tmp_path: Option<PathBuf>,
20    chunk: ChunkId,
21}
22
23#[derive(Clone)]
24pub struct LocalChunkWriter(Arc<WriterImpl>);
25
26impl std::fmt::Display for LocalChunkWriter {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        write!(f, "LocalChunkWriter{{path:{:?}}}", self.path())
29    }
30}
31
32
33impl LocalChunkWriter {
34    pub fn from_path(
35        path: &Path,
36        chunk: &ChunkId,
37    ) -> Self {
38        let tmp_path = format!(
39            "{}-{}",
40            path.file_name().unwrap().to_str().unwrap(),
41            bucky_time_now()
42        );
43        Self::new(
44            path.to_owned(),
45            Some(path.parent().unwrap().join(tmp_path.as_str())),
46            chunk,
47        )
48    }
49
50
51    pub fn new(
52        path: PathBuf,
53        tmp_path: Option<PathBuf>,
54        chunk: &ChunkId,
55    ) -> Self {
56        Self(Arc::new(WriterImpl {
57            path,
58            tmp_path,
59            chunk: chunk.clone(),
60        }))
61    }
62
63    
64    fn path(&self) -> &Path {
65        self.0.path.as_path()
66    }
67
68    fn chunk(&self) -> &ChunkId {
69        &self.0.chunk
70    }
71
72
73    async fn write_inner<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
74        if self.chunk().len() == 0 {
75            return Ok(());
76        }
77
78        let path = self.0.tmp_path.as_ref().map(|p| p.as_path()).unwrap_or(self.path());
79
80        let file = OpenOptions::new().create(true).write(true).open(path).await
81            .map_err(|e| {
82                let msg = format!("{} open file failed for {}", self, e);
83                error!("{}", msg);
84                BuckyError::new(BuckyErrorCode::IoError, msg)
85            })?;
86
87        let _ = async_std::io::copy(reader, file).await
88            .map_err(|e| {
89                let msg = format!(
90                    "{} write chunk file failed for {}",
91                    self, 
92                    e
93                );
94                error!("{}", msg);
95
96                BuckyError::new(BuckyErrorCode::IoError, msg)
97            })?;
98        
99            
100        if self.0.tmp_path.is_some() {
101            let tmp_path = self.0.tmp_path.as_ref().unwrap().as_path();
102            let ret = fs::rename(tmp_path, self.path()).await;
103            if ret.is_err() {
104                if !self.path().exists() {
105                    let msg = format!("{} rename tmp file failed for {}", self, ret.err().unwrap());
106                    error!("{}", msg);
107
108                    return Err(BuckyError::new(BuckyErrorCode::IoError, msg));
109                }
110            }
111        }
112
113        info!("{} writen chunk to file", self);
114
115        Ok(())
116    }
117
118    pub async fn write<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
119        if self.chunk().len() == 0 {
120            return Ok(());
121        }
122
123        let ret = self.write_inner(reader).await;
124
125        if self.0.tmp_path.is_some() {
126            let tmp_path = self.0.tmp_path.as_ref().unwrap().as_path();
127            let _ = fs::remove_file(tmp_path).await;
128        }
129        
130        ret
131    }
132}
133
134
135struct ListWriterImpl {
136    path: PathBuf,
137    desc: ChunkListDesc,
138}
139
140#[derive(Clone)]
141pub struct LocalChunkListWriter(Arc<ListWriterImpl>);
142
143impl std::fmt::Display for LocalChunkListWriter {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        write!(f, "LocalChunkListWriter{{path:{:?}}}", self.path())
146    }
147}
148
149impl LocalChunkListWriter {
150    pub fn from_file(
151        path: PathBuf, 
152        file: &File
153    ) -> BuckyResult<Self> {
154        Ok(Self::new(path, &ChunkListDesc::from_file(&file)?))
155    }
156
157    pub fn new(
158        path: PathBuf, 
159        desc: &ChunkListDesc
160    ) -> Self {
161        
162        Self(Arc::new(ListWriterImpl {
163            path, 
164            desc: desc.clone(),  
165        }))
166    }
167
168
169    fn path(&self) -> &Path {
170        self.0.path.as_path()
171    }
172
173    fn chunk_list(&self) -> &ChunkListDesc {
174        &self.0.desc
175    }
176
177    pub async fn write<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
178        // 零长度的chunk不需要触发真正的写入操作
179        if self.chunk_list().total_len() == 0 {
180            return Ok(());
181        }
182
183        let mut reader = reader;
184        let mut file = OpenOptions::new()
185            .create(true)
186            .write(true)
187            .open(self.path())
188            .await
189            .map_err(|e| {
190                let msg = format!("{} open file failed for {}", self, e);
191                error!("{}", msg);
192                BuckyError::new(BuckyErrorCode::IoError, msg)
193            })?;
194
195        // 强制设置为目标大小
196        file.set_len(self.chunk_list().total_len())
197            .await
198            .map_err(|e| {
199                let msg = format!(
200                    "{} create trans data file with len {} failed for {}",
201                    self,
202                    self.chunk_list().total_len(),
203                    e
204                );
205                error!("{}", msg);
206
207                BuckyError::new(BuckyErrorCode::IoError, msg)
208            })?;
209
210        // 强制设置为目标大小
211        file.set_len(self.chunk_list().total_len()).await.map_err(|e| {
212            let msg = format!(
213                "{} create trans data file with len {} failed for {}",
214                self, 
215                self.chunk_list().total_len(),
216                e
217            );
218            error!("{}", msg);
219
220            BuckyError::new(BuckyErrorCode::IoError, msg)
221        })?;
222
223        for chunk in self.chunk_list().chunks().iter() {
224            if chunk.len() == 0 {
225                continue;
226            }
227
228            let mut buffer = vec![0u8; chunk.len()];
229            reader.read_exact(&mut buffer[..]).await?;
230
231            file.write_all(&buffer[..]).await?;
232        }
233
234        Ok(())
235    }
236}
237
238
239
240pub fn local_chunk_writer(
241    chunk: &ChunkId, 
242    path: PathBuf
243) -> LocalChunkWriter {
244    LocalChunkWriter::new(path, None, chunk)
245}
246
247pub fn local_file_writer(
248    file: &File, 
249    path: PathBuf 
250) -> BuckyResult<LocalChunkListWriter> {
251    Ok(LocalChunkListWriter::new(path, &ChunkListDesc::from_file(&file)?))
252}