cyfs_bdt/utils/ndn/
local_store.rs1use log::*;
2use std::{
3 sync::{Arc},
4 path::{Path, PathBuf},
5};
6use async_std::{
7 prelude::*,
8 fs::{self, OpenOptions},
9};
10use cyfs_base::*;
11
12use crate::{
13 ndn::*
14};
15
16
17struct WriterImpl {
18 path: PathBuf,
19 tmp_path: Option<PathBuf>,
20 chunk: ChunkId,
21}
22
23#[derive(Clone)]
24pub struct LocalChunkWriter(Arc<WriterImpl>);
25
26impl std::fmt::Display for LocalChunkWriter {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 write!(f, "LocalChunkWriter{{path:{:?}}}", self.path())
29 }
30}
31
32
33impl LocalChunkWriter {
34 pub fn from_path(
35 path: &Path,
36 chunk: &ChunkId,
37 ) -> Self {
38 let tmp_path = format!(
39 "{}-{}",
40 path.file_name().unwrap().to_str().unwrap(),
41 bucky_time_now()
42 );
43 Self::new(
44 path.to_owned(),
45 Some(path.parent().unwrap().join(tmp_path.as_str())),
46 chunk,
47 )
48 }
49
50
51 pub fn new(
52 path: PathBuf,
53 tmp_path: Option<PathBuf>,
54 chunk: &ChunkId,
55 ) -> Self {
56 Self(Arc::new(WriterImpl {
57 path,
58 tmp_path,
59 chunk: chunk.clone(),
60 }))
61 }
62
63
64 fn path(&self) -> &Path {
65 self.0.path.as_path()
66 }
67
68 fn chunk(&self) -> &ChunkId {
69 &self.0.chunk
70 }
71
72
73 async fn write_inner<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
74 if self.chunk().len() == 0 {
75 return Ok(());
76 }
77
78 let path = self.0.tmp_path.as_ref().map(|p| p.as_path()).unwrap_or(self.path());
79
80 let file = OpenOptions::new().create(true).write(true).open(path).await
81 .map_err(|e| {
82 let msg = format!("{} open file failed for {}", self, e);
83 error!("{}", msg);
84 BuckyError::new(BuckyErrorCode::IoError, msg)
85 })?;
86
87 let _ = async_std::io::copy(reader, file).await
88 .map_err(|e| {
89 let msg = format!(
90 "{} write chunk file failed for {}",
91 self,
92 e
93 );
94 error!("{}", msg);
95
96 BuckyError::new(BuckyErrorCode::IoError, msg)
97 })?;
98
99
100 if self.0.tmp_path.is_some() {
101 let tmp_path = self.0.tmp_path.as_ref().unwrap().as_path();
102 let ret = fs::rename(tmp_path, self.path()).await;
103 if ret.is_err() {
104 if !self.path().exists() {
105 let msg = format!("{} rename tmp file failed for {}", self, ret.err().unwrap());
106 error!("{}", msg);
107
108 return Err(BuckyError::new(BuckyErrorCode::IoError, msg));
109 }
110 }
111 }
112
113 info!("{} writen chunk to file", self);
114
115 Ok(())
116 }
117
118 pub async fn write<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
119 if self.chunk().len() == 0 {
120 return Ok(());
121 }
122
123 let ret = self.write_inner(reader).await;
124
125 if self.0.tmp_path.is_some() {
126 let tmp_path = self.0.tmp_path.as_ref().unwrap().as_path();
127 let _ = fs::remove_file(tmp_path).await;
128 }
129
130 ret
131 }
132}
133
134
135struct ListWriterImpl {
136 path: PathBuf,
137 desc: ChunkListDesc,
138}
139
140#[derive(Clone)]
141pub struct LocalChunkListWriter(Arc<ListWriterImpl>);
142
143impl std::fmt::Display for LocalChunkListWriter {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 write!(f, "LocalChunkListWriter{{path:{:?}}}", self.path())
146 }
147}
148
149impl LocalChunkListWriter {
150 pub fn from_file(
151 path: PathBuf,
152 file: &File
153 ) -> BuckyResult<Self> {
154 Ok(Self::new(path, &ChunkListDesc::from_file(&file)?))
155 }
156
157 pub fn new(
158 path: PathBuf,
159 desc: &ChunkListDesc
160 ) -> Self {
161
162 Self(Arc::new(ListWriterImpl {
163 path,
164 desc: desc.clone(),
165 }))
166 }
167
168
169 fn path(&self) -> &Path {
170 self.0.path.as_path()
171 }
172
173 fn chunk_list(&self) -> &ChunkListDesc {
174 &self.0.desc
175 }
176
177 pub async fn write<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
178 if self.chunk_list().total_len() == 0 {
180 return Ok(());
181 }
182
183 let mut reader = reader;
184 let mut file = OpenOptions::new()
185 .create(true)
186 .write(true)
187 .open(self.path())
188 .await
189 .map_err(|e| {
190 let msg = format!("{} open file failed for {}", self, e);
191 error!("{}", msg);
192 BuckyError::new(BuckyErrorCode::IoError, msg)
193 })?;
194
195 file.set_len(self.chunk_list().total_len())
197 .await
198 .map_err(|e| {
199 let msg = format!(
200 "{} create trans data file with len {} failed for {}",
201 self,
202 self.chunk_list().total_len(),
203 e
204 );
205 error!("{}", msg);
206
207 BuckyError::new(BuckyErrorCode::IoError, msg)
208 })?;
209
210 file.set_len(self.chunk_list().total_len()).await.map_err(|e| {
212 let msg = format!(
213 "{} create trans data file with len {} failed for {}",
214 self,
215 self.chunk_list().total_len(),
216 e
217 );
218 error!("{}", msg);
219
220 BuckyError::new(BuckyErrorCode::IoError, msg)
221 })?;
222
223 for chunk in self.chunk_list().chunks().iter() {
224 if chunk.len() == 0 {
225 continue;
226 }
227
228 let mut buffer = vec![0u8; chunk.len()];
229 reader.read_exact(&mut buffer[..]).await?;
230
231 file.write_all(&buffer[..]).await?;
232 }
233
234 Ok(())
235 }
236}
237
238
239
240pub fn local_chunk_writer(
241 chunk: &ChunkId,
242 path: PathBuf
243) -> LocalChunkWriter {
244 LocalChunkWriter::new(path, None, chunk)
245}
246
247pub fn local_file_writer(
248 file: &File,
249 path: PathBuf
250) -> BuckyResult<LocalChunkListWriter> {
251 Ok(LocalChunkListWriter::new(path, &ChunkListDesc::from_file(&file)?))
252}