1use log::*;
2use std::{
3 sync::{Arc},
4 path::{Path, PathBuf},
5};
6use async_std::{
7 prelude::*,
8 fs::{self, OpenOptions},
9 io::{SeekFrom, Cursor},
10};
11use cyfs_base::*;
12use cyfs_util::*;
13use crate::{
14 ndn::*
15};
16
17
18struct StoreImpl {
19 ndc: Box<dyn NamedDataCache>,
20 tracker: Box<dyn TrackerCache>,
21}
22
23
24#[derive(Clone)]
25pub struct TrackedChunkStore(Arc<StoreImpl>);
26
27impl TrackedChunkStore {
28 pub fn new(
29 ndc: Box<dyn NamedDataCache>,
30 tracker: Box<dyn TrackerCache>,
31 ) -> Self {
32 Self(Arc::new(StoreImpl {
33 ndc,
34 tracker,
35 }))
36 }
37
38 pub async fn track_chunk(&self, chunk: &ChunkId) -> BuckyResult<()> {
39 let request = InsertChunkRequest {
40 chunk_id: chunk.to_owned(),
41 state: ChunkState::Unknown,
42 ref_objects: None,
43 trans_sessions: None,
44 flags: 0,
45 };
46
47 self.ndc().insert_chunk(&request).await.map_err(|e| {
48 error!("record file chunk to ndc error! chunk={}, {}",chunk, e);
49 e
50 })
51 }
52
53 pub async fn track_file(&self, file: &File) -> BuckyResult<()> {
54 let file_id = file.desc().calculate_id();
55 match file.body() {
56 Some(body) => {
57 let chunk_list = body.content().inner_chunk_list();
58 match chunk_list {
59 Some(chunks) => {
60 for chunk in chunks {
61 let ref_obj = ChunkObjectRef {
63 object_id: file_id.to_owned(),
64 relation: ChunkObjectRelation::FileBody,
65 };
66
67 let req = InsertChunkRequest {
68 chunk_id: chunk.to_owned(),
69 state: ChunkState::Unknown,
70 ref_objects: Some(vec![ref_obj]),
71 trans_sessions: None,
72 flags: 0,
73 };
74
75 self.ndc().insert_chunk(&req).await.map_err(|e| {
76 error!("record file chunk to ndc error! file={}, chunk={}, {}", file_id, chunk, e);
77 e
78 })?;
79
80 info!("insert chunk of file to ndc, chunk:{}, file:{}", chunk, file_id);
81 }
82 Ok(())
83 }
84 None => Err(BuckyError::new(
85 BuckyErrorCode::NotSupport,
86 format!("file object should has chunk list: {}", file_id),
87 )),
88 }
89 }
90 None => {
91 Err(BuckyError::new(
92 BuckyErrorCode::InvalidFormat,
93 format!("file object should has body: {}", file_id),
94 ))
95 }
96 }
97 }
98
99
100 pub async fn track_file_in_path(
101 &self,
102 file: File,
103 path: PathBuf
104 ) -> BuckyResult<()> {
105 let _ = self.track_file(&file).await?;
106 TrackedChunkListWriter::new(
107 path,
108 &ChunkListDesc::from_file(&file)?,
109 self.ndc(),
110 self.tracker()
111 ).track_path().await
112 }
113
114 fn ndc(&self) -> &dyn NamedDataCache {
115 self.0.ndc.as_ref()
116 }
117
118 fn tracker(&self) -> &dyn TrackerCache {
119 self.0.tracker.as_ref()
120 }
121
122
123 async fn read_chunk_from_file(chunk: &ChunkId, path: &Path, offset: u64) -> BuckyResult<Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>> {
124 debug!("begin read {} from file {:?}", chunk, path);
125 let mut file = OpenOptions::new()
126 .read(true)
127 .open(path)
128 .await
129 .map_err(|e| {
130 let msg = format!("open file {:?} failed for {}", path, e);
131 error!("{}", msg);
132 BuckyError::new(BuckyErrorCode::IoError, msg)
133 })?;
134
135 let actual_offset = file.seek(SeekFrom::Start(offset)).await.map_err(|e| {
136 let msg = format!("seek file {:?} to offset {} failed for {}", path, offset, e);
137 error!("{}", msg);
138
139 BuckyError::new(BuckyErrorCode::IoError, msg)
140 })?;
141
142 if actual_offset != offset {
143 let msg = format!(
144 "seek file {:?} to offset {} actual offset {}",
145 path, offset, actual_offset
146 );
147 error!("{}", msg);
148
149 return Err(BuckyError::new(BuckyErrorCode::IoError, msg));
150 }
151
152 let mut content = Vec::with_capacity(chunk.len());
153 unsafe { content.set_len(chunk.len()) };
154 file.read_exact(&mut content).await.map_err(|e| {
155 let msg = format!(
156 "read chunk from file {:?} error, chunk={}, len={}, {}",
157 path,
158 chunk,
159 chunk.len(),
160 e
161 );
162 error!("{}", msg);
163
164 BuckyError::new(BuckyErrorCode::IoError, msg)
165 })?;
166
167 let actual_id = ChunkId::calculate(content.as_slice()).await?;
168
169 if actual_id.eq(chunk) {
170 debug!("read {} from file {:?}", chunk, path);
171 Ok(Box::new(Cursor::new(content)))
172 } else {
173 let msg = format!("content in file {:?} not match chunk id", path);
174 error!("{}", msg);
175 Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
176 }
177 }
178
179 async fn is_chunk_stored_in_file(&self, chunk: &ChunkId, path: &Path) -> BuckyResult<bool> {
180 let request = GetTrackerPositionRequest {
181 id: chunk.to_string(),
182 direction: Some(TrackerDirection::Store),
183 };
184 let ret = self.0.tracker.get_position(&request).await?;
185 if ret.len() == 0 {
186 Ok(false)
187 } else {
188 for c in ret {
189 match &c.pos {
190 TrackerPostion::File(exists) => {
191 if path.eq(Path::new(exists)) {
192 return Ok(true);
193 }
194 }
195 TrackerPostion::FileRange(fr) => {
196 if path.eq(Path::new(&fr.path)) {
197 return Ok(true);
198 }
199 }
200 _ => {}
201 }
202 }
203 Ok(false)
204 }
205 }
206}
207
208
209#[async_trait::async_trait]
210impl ChunkReader for TrackedChunkStore {
211 fn clone_as_reader(&self) -> Box<dyn ChunkReader> {
212 Box::new(self.clone())
213 }
214
215 async fn exists(&self, chunk: &ChunkId) -> bool {
216 let request = GetChunkRequest {
217 chunk_id: chunk.clone(),
218 flags: 0,
219 };
220 match self.ndc().get_chunk(&request).await {
221 Ok(c) => {
222 if let Some(c) = c {
223 c.state == ChunkState::Ready
224 } else {
225 false
226 }
227 }
228 Err(e) => {
229 error!("got chunk state {} from database failed for {}", chunk, e);
230 false
231 }
232 }
233 }
234
235 async fn get(&self, chunk: &ChunkId) -> BuckyResult<Box<dyn AsyncReadWithSeek + Unpin + Send + Sync>> {
236 let request = GetTrackerPositionRequest {
237 id: chunk.to_string(),
238 direction: Some(TrackerDirection::Store),
239 };
240 let ret = self.tracker().get_position(&request).await?;
241 if ret.len() == 0 {
242 Err(BuckyError::new(
243 BuckyErrorCode::NotFound,
244 "chunk not exists",
245 ))
246 } else {
247 for c in ret {
248 let read_ret = match &c.pos {
249 TrackerPostion::File(path) => {
251 Self::read_chunk_from_file(chunk, Path::new(path), 0).await
252 }
253 TrackerPostion::FileRange(fr) => {
254 Self::read_chunk_from_file(
255 chunk,
256 Path::new(fr.path.as_str()),
257 fr.range_begin,
258 )
259 .await
260 }
261 _ => Err(BuckyError::new(
262 BuckyErrorCode::InvalidFormat,
263 "unsupport reader",
264 )),
265 };
266
267 match read_ret {
268 Ok(reader) => {
269 return Ok(reader);
270 },
271 Err(e) => {
272 let _ = self
274 .0
275 .tracker
276 .remove_position(&RemoveTrackerPositionRequest {
277 id: chunk.to_string(),
278 direction: Some(TrackerDirection::Store),
279 pos: Some(c.pos.clone()),
280 })
281 .await;
282 error!(
283 "read {} from tracker position {:?} failed for {}",
284 chunk, c.pos, e
285 );
286 continue;
287 }
288 }
289 }
290
291 error!("read {} from all tracker position failed", chunk);
292 Err(BuckyError::new(
293 BuckyErrorCode::NotFound,
294 "chunk not exists",
295 ))
296 }
297 }
298}
299
300
301struct WriterImpl {
302 path: PathBuf,
303 tmp_path: Option<PathBuf>,
304 chunk: ChunkId,
305 ndc: Box<dyn NamedDataCache>,
306 tracker: Box<dyn TrackerCache>,
307}
308
309#[derive(Clone)]
310pub struct TrackedChunkWriter(Arc<WriterImpl>);
311
312impl std::fmt::Display for TrackedChunkWriter {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 write!(f, "TrackedChunkWriter{{path:{:?}}}", self.path())
315 }
316}
317
318
319impl TrackedChunkWriter {
320 fn from_path(
321 path: &Path,
322 chunk: &ChunkId,
323 ndc: &dyn NamedDataCache,
324 tracker: &dyn TrackerCache,
325 ) -> Self {
326 let tmp_path = format!(
327 "{}-{}",
328 path.file_name().unwrap().to_str().unwrap(),
329 bucky_time_now()
330 );
331 Self::new(
332 path.to_owned(),
333 Some(path.parent().unwrap().join(tmp_path.as_str())),
334 chunk,
335 ndc,
336 tracker,
337 )
338 }
339
340
341 fn new(
342 path: PathBuf,
343 tmp_path: Option<PathBuf>,
344 chunk: &ChunkId,
345 ndc: &dyn NamedDataCache,
346 tracker: &dyn TrackerCache,
347 ) -> Self {
348 Self(Arc::new(WriterImpl {
349 path,
350 tmp_path,
351 chunk: chunk.clone(),
352 ndc: ndc.clone(),
353 tracker: tracker.clone(),
354 }))
355 }
356
357 pub async fn track_path(&self) -> BuckyResult<()> {
358 let request = UpdateChunkStateRequest {
359 chunk_id: self.chunk().clone(),
360 current_state: None,
361 state: ChunkState::Ready,
362 };
363 let _ = self.0.ndc.update_chunk_state(&request).await.map_err(|e| {
364 error!("{} add to tracker failed for {}", self, e);
365 e
366 })?;
367 let request = AddTrackerPositonRequest {
368 id: self.chunk().to_string(),
369 direction: TrackerDirection::Store,
370 pos: TrackerPostion::File(self.path().to_str().unwrap().to_string()),
371 flags: 0,
372 };
373 self.0.tracker.add_position(&request).await.map_err(|e| {
374 error!("{} add to tracker failed for {}", self, e);
375 e
376 })?;
377
378 Ok(())
379 }
380
381
382 fn path(&self) -> &Path {
383 self.0.path.as_path()
384 }
385
386 fn chunk(&self) -> &ChunkId {
387 &self.0.chunk
388 }
389
390
391 async fn write_inner<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
392 if self.chunk().len() == 0 {
393 return Ok(());
394 }
395
396 let path = self.0.tmp_path.as_ref().map(|p| p.as_path()).unwrap_or(self.path());
397
398 let file = OpenOptions::new().create(true).write(true).open(path).await
399 .map_err(|e| {
400 let msg = format!("{} open file failed for {}", self, e);
401 error!("{}", msg);
402 BuckyError::new(BuckyErrorCode::IoError, msg)
403 })?;
404
405 let _ = async_std::io::copy(reader, file).await
406 .map_err(|e| {
407 let msg = format!(
408 "{} write chunk file failed for {}",
409 self,
410 e
411 );
412 error!("{}", msg);
413
414 BuckyError::new(BuckyErrorCode::IoError, msg)
415 })?;
416
417
418 if self.0.tmp_path.is_some() {
419 let tmp_path = self.0.tmp_path.as_ref().unwrap().as_path();
420 let ret = fs::rename(tmp_path, self.path()).await;
421 if ret.is_err() {
422 if !self.path().exists() {
423 let msg = format!("{} rename tmp file failed for {}", self, ret.err().unwrap());
424 error!("{}", msg);
425
426 return Err(BuckyError::new(BuckyErrorCode::IoError, msg));
427 }
428 }
429 }
430
431 info!("{} writen chunk to file", self);
432
433 self.track_path().await
434 }
435
436 pub async fn write<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
437 if self.chunk().len() == 0 {
438 return Ok(());
439 }
440
441 let ret = self.write_inner(reader).await;
442
443 if self.0.tmp_path.is_some() {
444 let tmp_path = self.0.tmp_path.as_ref().unwrap().as_path();
445 let _ = fs::remove_file(tmp_path).await;
446 }
447
448 ret
449 }
450}
451
452
453struct ListWriterImpl {
454 path: PathBuf,
455 desc: ChunkListDesc,
456 ndc: Box<dyn NamedDataCache>,
457 tracker: Box<dyn TrackerCache>,
458}
459
460#[derive(Clone)]
461pub struct TrackedChunkListWriter(Arc<ListWriterImpl>);
462
463impl std::fmt::Display for TrackedChunkListWriter {
464 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465 write!(f, "TrackedChunkListWriter{{path:{:?}}}", self.path())
466 }
467}
468
469impl TrackedChunkListWriter {
470 fn new(
471 path: PathBuf,
472 desc: &ChunkListDesc,
473 ndc: &dyn NamedDataCache,
474 tracker: &dyn TrackerCache) -> Self {
475
476 Self(Arc::new(ListWriterImpl {
477 path,
478 desc: desc.clone(),
479 ndc: ndc.clone(),
480 tracker: tracker.clone(),
481 }))
482 }
483
484 async fn track_chunk_index(&self, chunk: &ChunkId, index: usize) -> BuckyResult<()> {
485 let offset = self.chunk_list().offset_of(index).unwrap();
486
487 let request = UpdateChunkStateRequest {
488 chunk_id: chunk.clone(),
489 current_state: None,
490 state: ChunkState::Ready,
491 };
492 let _ = self.0.ndc.update_chunk_state(&request).await.map_err(|e| {
493 error!("{} add {} to tracker failed for {}", self, chunk, e);
494 e
495 })?;
496 let request = AddTrackerPositonRequest {
497 id: chunk.to_string(),
498 direction: TrackerDirection::Store,
499 pos: TrackerPostion::FileRange(PostionFileRange {
500 path: self.path().to_str().unwrap().to_string(),
501 range_begin: offset,
502 range_end: offset + chunk.len() as u64,
503 }),
504 flags: 0,
505 };
506 self.0.tracker.add_position(&request).await.map_err(|e| {
507 error!("{} add {} to tracker failed for {}", self, chunk, e);
508 e
509 })?;
510
511 Ok(())
512 }
513
514 pub async fn track_path(&self) -> BuckyResult<()> {
515 for (index, chunk) in self.chunk_list().chunks().iter().enumerate() {
516 let _ = self.track_chunk_index(chunk, index).await?;
517 }
518 Ok(())
519 }
520
521 fn path(&self) -> &Path {
522 self.0.path.as_path()
523 }
524
525 fn chunk_list(&self) -> &ChunkListDesc {
526 &self.0.desc
527 }
528
529 pub async fn write<R: async_std::io::Read + Unpin>(&self, reader: R) -> BuckyResult<()> {
530 if self.chunk_list().total_len() == 0 {
532 return Ok(());
533 }
534
535 let mut reader = reader;
536 let mut file = OpenOptions::new()
537 .create(true)
538 .write(true)
539 .open(self.path())
540 .await
541 .map_err(|e| {
542 let msg = format!("{} open file failed for {}", self, e);
543 error!("{}", msg);
544 BuckyError::new(BuckyErrorCode::IoError, msg)
545 })?;
546
547 file.set_len(self.chunk_list().total_len())
549 .await
550 .map_err(|e| {
551 let msg = format!(
552 "{} create trans data file with len {} failed for {}",
553 self,
554 self.chunk_list().total_len(),
555 e
556 );
557 error!("{}", msg);
558
559 BuckyError::new(BuckyErrorCode::IoError, msg)
560 })?;
561
562 file.set_len(self.chunk_list().total_len()).await.map_err(|e| {
564 let msg = format!(
565 "{} create trans data file with len {} failed for {}",
566 self,
567 self.chunk_list().total_len(),
568 e
569 );
570 error!("{}", msg);
571
572 BuckyError::new(BuckyErrorCode::IoError, msg)
573 })?;
574
575 for (index, chunk) in self.chunk_list().chunks().iter().enumerate() {
576 if chunk.len() == 0 {
577 continue;
578 }
579
580 let mut buffer = vec![0u8; chunk.len()];
581 reader.read_exact(&mut buffer[..]).await?;
582
583 file.write_all(&buffer[..]).await?;
584
585 let _ = self.track_chunk_index(chunk, index).await?;
586 }
587
588 Ok(())
589 }
590}
591
592
593impl TrackedChunkStore {
594 pub async fn chunk_writer(
595 &self,
596 chunk: &ChunkId,
597 path: PathBuf
598 ) -> BuckyResult<TrackedChunkWriter> {
599 let _ = self.track_chunk(chunk).await?;
600 Ok(TrackedChunkWriter::new(path, None, chunk, self.ndc(), self.tracker()))
601 }
602
603 pub async fn chunk_list_writer(
604 &self,
605 chunk_list: &ChunkListDesc,
606 path: PathBuf
607 ) -> BuckyResult<TrackedChunkListWriter> {
608 for chunk in chunk_list.chunks() {
609 let _ = self.track_chunk(chunk).await?;
610 }
611 Ok(TrackedChunkListWriter::new(path, chunk_list, self.ndc(), self.tracker()))
612 }
613
614 pub async fn file_writer(
615 &self,
616 file: &File,
617 path: PathBuf
618 ) -> BuckyResult<TrackedChunkListWriter> {
619 let _ = self.track_file(file).await?;
620 Ok(TrackedChunkListWriter::new(path, &ChunkListDesc::from_file(&file)?, self.ndc(), self.tracker()))
621 }
622}