1use std::path::{Path, PathBuf};
2
3use blake3::{Hash, HexError};
4use miniz_oxide::{deflate::compress_to_vec, inflate::decompress_to_vec};
5use nanoid::nanoid;
6use thiserror::Error;
7use tokio::{
8 fs::{create_dir_all, read, read_dir, remove_dir_all, remove_file, rename, write, File},
9 io::{AsyncReadExt, AsyncWriteExt},
10};
11
12use crate::model::Record;
13
14use super::model::{Blob, ObjectId, Repository, Tree};
15
16pub struct RepoGuard {
17 pub repo: Repository,
18}
19
20impl RepoGuard {
21 pub async fn new(repo: &Repository) -> Result<Self, WsvcFsError> {
22 repo.check_lock().await?;
23 repo.lock().await?;
24 Ok(Self { repo: repo.clone() })
25 }
26}
27
28impl Drop for RepoGuard {
29 fn drop(&mut self) {
30 let repo = self.repo.clone();
31 let temp_dir = self.repo.path.join("temp");
32 if temp_dir.exists() {
33 std::fs::remove_dir_all(temp_dir).ok();
34 }
35 repo.unlock().ok();
36 }
37}
38
39#[derive(Error, Debug)]
41pub enum WsvcFsError {
42 #[error("os file system error: {0}")]
43 Os(#[from] std::io::Error),
44 #[error("decompress error: {0}")]
45 DecompressFailed(String),
46 #[error("unknown path: {0}")]
47 UnknownPath(String),
48 #[error("invalid hex string: {0}")]
49 InvalidHexString(#[from] HexError),
50 #[error("serialize failed: {0}")]
51 SerializationFailed(#[from] serde_json::Error),
52 #[error("invalid filename: {0}")]
53 InvalidFilename(String),
54 #[error("invalid OsString: {0}")]
55 InvalidOsString(String),
56 #[error("dir already exists: {0}")]
57 DirAlreadyExists(String),
58 #[error("no changes with record: {0}")]
59 NoChanges(String),
60 #[error("workspace is locked")]
61 WorkspaceLocked,
62 #[error("remote not set")]
63 RemoteNotSet,
64}
65
66#[derive(Clone, Debug)]
67struct TreeImpl {
68 name: String,
69 trees: Vec<TreeImpl>,
70 blobs: Vec<Blob>,
71}
72
73async fn store_blob_file_impl(
75 path: impl AsRef<Path>,
76 objects_dir: impl AsRef<Path>,
77 temp: impl AsRef<Path>,
78) -> Result<ObjectId, WsvcFsError> {
79 if !temp.as_ref().exists() {
80 create_dir_all(temp.as_ref()).await?;
81 }
82 let mut buffer: [u8; 16384] = [0; 16384];
83 let mut file = File::open(&path).await?;
84 let compressed_file_path = temp.as_ref().join(nanoid!());
85 let mut compressed_file = File::create(&compressed_file_path).await?;
86 let mut hasher = blake3::Hasher::new();
87 loop {
88 let n = file.read(&mut buffer).await?;
89 if n == 0 {
90 break;
91 }
92 hasher.update(&buffer[..n]);
93 let compressed_data = compress_to_vec(&buffer[..n], 8);
94 compressed_file
95 .write_all(&[
96 0x78,
97 0xda,
98 (&compressed_data.len() / 256).try_into().unwrap(),
99 (&compressed_data.len() % 256).try_into().unwrap(),
100 ])
101 .await?;
102 compressed_file.write_all(&compressed_data).await?;
103 }
104 let hash = hasher.finalize();
105 let blob = objects_dir.as_ref().join(hash.to_hex().as_str());
106 rename(&compressed_file_path, &blob).await?;
107 Ok(ObjectId(hash))
108}
109
110async fn checkout_blob_file_impl(
112 path: impl AsRef<Path>,
113 objects_dir: impl AsRef<Path>,
114 blob_hash: &ObjectId,
115 temp: impl AsRef<Path>,
116) -> Result<(), WsvcFsError> {
117 let blob_path = objects_dir.as_ref().join(blob_hash.0.to_hex().as_str());
118 let mut buffer: [u8; 32768] = [0; 32768];
119 let mut header_buffer: [u8; 4] = [0; 4];
120 let mut file = File::open(&blob_path).await?;
121 let decompressed_file_path = temp.as_ref().join(nanoid!());
122 let mut decompressed_file = File::create(&decompressed_file_path).await?;
123 loop {
124 let n = file.read(&mut header_buffer).await?;
125 if n == 0 {
126 break;
127 }
128 if header_buffer[0] != 0x78 || header_buffer[1] != 0xda {
129 return Err(WsvcFsError::DecompressFailed(
130 "magic header not match".to_owned(),
131 ));
132 }
133 let size = (header_buffer[2] as usize) * 256 + (header_buffer[3] as usize);
134 let n = file.read(&mut buffer[..size]).await?;
135 if n != size {
136 return Err(WsvcFsError::DecompressFailed("broken chunk".to_owned()));
137 }
138 decompressed_file
139 .write_all(
140 &decompress_to_vec(&buffer[..n])
141 .map_err(|_| WsvcFsError::DecompressFailed("decode chunk failed".to_owned()))?,
142 )
143 .await?;
144 }
145 rename(&decompressed_file_path, path).await?;
146 Ok(())
147}
148
149#[async_recursion::async_recursion(?Send)]
155async fn store_tree_file_impl(
156 tree: TreeImpl,
157 trees_dir: &Path,
158) -> Result<(Tree, bool), WsvcFsError> {
159 let mut result = Tree {
160 name: tree.name,
161 hash: ObjectId(Hash::from([0; 32])),
162 trees: vec![],
163 blobs: tree.blobs.clone(),
164 };
165 for tree in tree.trees {
166 result
167 .trees
168 .push(store_tree_file_impl(tree, trees_dir).await?.0.hash);
169 }
170 let hash = blake3::hash(serde_json::to_vec(&result)?.as_slice());
171 result.hash = ObjectId(hash);
172 let tree_file_path = trees_dir.join(hash.to_hex().as_str());
173 if !tree_file_path.exists() {
174 write(
175 trees_dir.join(hash.to_string()),
176 serde_json::to_vec(&result)?,
177 )
178 .await?;
179 return Ok((result, true));
180 }
181
182 Ok((result, false))
183}
184
185#[async_recursion::async_recursion(?Send)]
189async fn build_tree(root: &Path, work_dir: &Path) -> Result<TreeImpl, WsvcFsError> {
190 let mut result = TreeImpl {
191 name: work_dir
192 .file_name()
193 .unwrap_or(std::ffi::OsStr::new("."))
194 .to_str()
195 .ok_or(WsvcFsError::InvalidOsString(format!("{:?}", work_dir)))?
196 .to_string(),
197 trees: vec![],
198 blobs: vec![],
199 };
200 let mut entries = read_dir(work_dir).await?;
201 while let Some(entry) = entries.next_entry().await? {
202 let entry_type = entry.file_type().await?;
203 if entry_type.is_dir() {
204 if entry.file_name() == ".wsvc" {
205 continue;
206 }
207 result.trees.push(build_tree(root, &entry.path()).await?);
208 } else if entry_type.is_file() {
209 result.blobs.push(
210 Blob {
211 name: entry
212 .file_name()
213 .to_str()
214 .ok_or(WsvcFsError::InvalidOsString(format!("{:?}", entry)))?
215 .to_string(),
216 hash: store_blob_file_impl(
217 &entry.path(),
218 &root.join("objects"),
219 &root.join("temp"),
220 )
221 .await?,
222 }
223 .clone(),
224 );
225 }
226 }
227 Ok(result)
228}
229
230impl Repository {
231 pub async fn new(path: impl AsRef<Path>, is_bare: bool) -> Result<Self, WsvcFsError> {
233 let mut path = path.as_ref().to_owned();
234 if !is_bare {
235 path = path.join(".wsvc");
236 }
237 if !path.exists() {
238 create_dir_all(path.join("objects")).await?;
239 create_dir_all(path.join("trees")).await?;
240 create_dir_all(path.join("records")).await?;
241 write(path.join("HEAD"), "").await?;
242 } else {
243 return Err(WsvcFsError::DirAlreadyExists(format!("{:?}", path)));
244 }
245 Ok(Self {
246 path,
247 lock: nanoid!(),
248 })
249 }
250
251 pub async fn open(path: impl AsRef<Path>, is_bare: bool) -> Result<Self, WsvcFsError> {
253 let mut path = path.as_ref().to_owned();
254 if !is_bare {
255 path = path.join(".wsvc");
256 }
257 if !path.exists() {
258 return Err(WsvcFsError::UnknownPath(
259 path.to_str()
260 .ok_or(WsvcFsError::InvalidOsString(format!("{:?}", path)))?
261 .to_string(),
262 ));
263 }
264 if path.join("objects").exists()
265 && path.join("trees").exists()
266 && path.join("records").exists()
267 && path.join("HEAD").exists()
268 {
269 Ok(Self {
270 path,
271 lock: nanoid!(),
272 })
273 } else {
274 Err(WsvcFsError::UnknownPath(
275 path.to_str()
276 .ok_or(WsvcFsError::InvalidOsString(format!("{:?}", path)))?
277 .to_string(),
278 ))
279 }
280 }
281
282 pub async fn lock(&self) -> Result<(), WsvcFsError> {
288 write(self.path.join("LOCK"), self.lock.clone()).await?;
289 Ok(())
290 }
291
292 pub async fn check_lock(&self) -> Result<(), WsvcFsError> {
294 let lock_path = self.path.join("LOCK");
295 if !lock_path.exists() {
296 return Ok(());
297 }
298 let lock = read(&lock_path).await?;
299 if String::from_utf8(lock)
300 .map_err(|err| WsvcFsError::InvalidOsString(format!("{:?}", err)))?
301 == self.lock
302 {
303 Ok(())
304 } else {
305 Err(WsvcFsError::WorkspaceLocked)
306 }
307 }
308
309 pub fn unlock(&self) -> Result<(), WsvcFsError> {
311 std::fs::remove_file(self.path.join("LOCK"))?;
312 Ok(())
313 }
314
315 pub async fn try_open(path: impl AsRef<Path>) -> Result<Self, WsvcFsError> {
319 if let Ok(repo) = Repository::open(&path, false).await {
320 Ok(repo)
321 } else {
322 Repository::open(&path, true).await
323 }
324 }
325
326 pub async fn temp_dir(&self) -> Result<PathBuf, WsvcFsError> {
328 let result = self.path.join("temp");
329 if !result.exists() {
330 create_dir_all(&result).await?;
331 }
332 Ok(result)
333 }
334
335 pub async fn objects_dir(&self) -> Result<PathBuf, WsvcFsError> {
337 let result = self.path.join("objects");
338 if !result.exists() {
339 create_dir_all(&result).await?;
340 }
341 Ok(result)
342 }
343
344 pub async fn trees_dir(&self) -> Result<PathBuf, WsvcFsError> {
346 let result = self.path.join("trees");
347 if !result.exists() {
348 create_dir_all(&result).await?;
349 }
350 Ok(result)
351 }
352
353 pub async fn records_dir(&self) -> Result<PathBuf, WsvcFsError> {
355 let result = self.path.join("records");
356 if !result.exists() {
357 create_dir_all(&result).await?;
358 }
359 Ok(result)
360 }
361
362 pub async fn store_blob(
364 &self,
365 workspace: impl AsRef<Path>,
366 rel_path: impl AsRef<Path>,
367 ) -> Result<Blob, WsvcFsError> {
368 Ok(Blob {
369 name: rel_path
370 .as_ref()
371 .file_name()
372 .ok_or(WsvcFsError::InvalidFilename(format!(
373 "{:?}",
374 rel_path.as_ref()
375 )))?
376 .to_str()
377 .ok_or(WsvcFsError::InvalidOsString(format!(
378 "{:?}",
379 rel_path.as_ref()
380 )))?
381 .to_string(),
382 hash: store_blob_file_impl(
383 workspace.as_ref().join(rel_path),
384 &self.objects_dir().await?,
385 &self.temp_dir().await?,
386 )
387 .await?,
388 })
389 }
390
391 pub async fn checkout_blob(
393 &self,
394 blob_hash: &ObjectId,
395 workspace: impl AsRef<Path>,
396 rel_path: impl AsRef<Path>,
397 ) -> Result<(), WsvcFsError> {
398 checkout_blob_file_impl(
399 &workspace.as_ref().join(rel_path),
400 &self.objects_dir().await?,
401 blob_hash,
402 &self.temp_dir().await?,
403 )
404 .await
405 }
406
407 pub async fn blob_exists(&self, blob_hash: &ObjectId) -> Result<bool, WsvcFsError> {
408 Ok(self
409 .objects_dir()
410 .await?
411 .join(blob_hash.0.to_hex().as_str())
412 .exists())
413 }
414
415 pub async fn read_blob(&self, blob_hash: &ObjectId) -> Result<Vec<u8>, WsvcFsError> {
417 let blob_path = self
418 .objects_dir()
419 .await?
420 .join(blob_hash.0.to_hex().as_str());
421 let mut buffer: [u8; 32768] = [0; 32768];
422 let mut header_buffer: [u8; 4] = [0; 4];
423 let mut file = File::open(&blob_path).await?;
424 let mut result = Vec::new();
425 loop {
426 let n = file.read(&mut header_buffer).await?;
427 if n == 0 {
428 break;
429 }
430 if header_buffer[0] != 0x78 || header_buffer[1] != 0xda {
431 return Err(WsvcFsError::DecompressFailed(
432 "magic header not match".to_owned(),
433 ));
434 }
435 let size = (header_buffer[2] as usize) * 256 + (header_buffer[3] as usize);
436 let n = file.read(&mut buffer[..size]).await?;
437 if n != size {
438 return Err(WsvcFsError::DecompressFailed("broken chunk".to_owned()));
439 }
440 result
441 .extend_from_slice(&decompress_to_vec(&buffer[..n]).map_err(|_| {
442 WsvcFsError::DecompressFailed("decode chunk failed".to_owned())
443 })?);
444 }
445 Ok(result)
446 }
447
448 pub async fn write_tree_recursively(
450 &self,
451 workspace: impl AsRef<Path> + Clone,
452 ) -> Result<(Tree, bool), WsvcFsError> {
453 let stored_tree = build_tree(&self.path, workspace.as_ref()).await?;
454 let result = store_tree_file_impl(stored_tree, &self.trees_dir().await?).await?;
455 Ok(result)
456 }
457
458 pub async fn tree_exists(&self, tree_hash: &ObjectId) -> Result<bool, WsvcFsError> {
459 Ok(self
460 .trees_dir()
461 .await?
462 .join(tree_hash.0.to_hex().as_str())
463 .exists())
464 }
465
466 pub async fn read_tree(&self, tree_hash: &ObjectId) -> Result<Tree, WsvcFsError> {
468 let tree_path = self.trees_dir().await?.join(tree_hash.0.to_hex().as_str());
469 let result = serde_json::from_slice::<Tree>(&tokio::fs::read(tree_path).await?)?;
470 Ok(result)
471 }
472
473 #[async_recursion::async_recursion(?Send)]
475 pub async fn checkout_tree(&self, tree: &Tree, workspace: &Path) -> Result<(), WsvcFsError> {
476 let mut entries = read_dir(workspace).await?;
479 let mut should_be_del = vec![];
480 while let Some(entry) = entries.next_entry().await? {
481 should_be_del.push(entry.file_name());
482 }
483
484 for tree in &tree.trees {
485 let tree = self.read_tree(tree).await?;
486 let tree_path = workspace.join(&tree.name);
487 if !tree_path.exists() {
488 create_dir_all(&tree_path).await?;
489 } else {
490 if !tree_path.is_dir() {
491 remove_file(&tree_path).await?;
492 }
493 if let Some(pos) = should_be_del
494 .iter()
495 .position(|x| x == tree_path.file_name().unwrap_or_default())
496 {
497 should_be_del.remove(pos);
498 }
499 }
500 self.checkout_tree(&tree, &tree_path).await?;
501 }
502 for blob in &tree.blobs {
503 let blob_path = workspace.join(&blob.name);
504 if !blob_path.exists() || !blob.checksum(&blob_path).await? {
505 self.checkout_blob(&blob.hash, &workspace, &blob.name)
506 .await?;
507 }
508 if let Some(pos) = should_be_del
509 .iter()
510 .position(|x| x == blob_path.file_name().unwrap_or_default())
511 {
512 should_be_del.remove(pos);
513 }
514 }
515 for entry in should_be_del {
516 let entry_path = workspace.join(entry);
517 if entry_path.is_dir() {
518 if entry_path.file_name().unwrap().eq(".wsvc") {
519 continue;
520 }
521 remove_dir_all(entry_path).await?;
522 } else {
523 remove_file(entry_path).await?;
524 }
525 }
526 Ok(())
527 }
528
529 pub async fn store_record(&self, record: &Record) -> Result<(), WsvcFsError> {
531 let record_path = self
532 .records_dir()
533 .await?
534 .join(record.hash.0.to_hex().as_str());
535 write(record_path, serde_json::to_vec(record)?).await?;
536 Ok(())
537 }
538
539 pub async fn find_record_for_tree(
541 &self,
542 tree_id: &Hash,
543 ) -> Result<Option<Record>, WsvcFsError> {
544 let records = self.get_records().await?;
545 for record in records {
546 let tree = self.read_tree(&record.root).await?;
547 if tree.hash.0 == *tree_id {
548 return Ok(Some(record));
549 }
550 }
551 Ok(None)
552 }
553
554 pub async fn commit_record(
556 &self,
557 workspace: &Path,
558 author: impl AsRef<str>,
559 message: impl AsRef<str>,
560 ) -> Result<Record, WsvcFsError> {
561 let tree = self.write_tree_recursively(workspace).await?;
562 if !tree.1 {
563 if let Some(record) = self.find_record_for_tree(&tree.0.hash.0).await? {
564 return Err(WsvcFsError::NoChanges(
565 record.hash.0.to_hex().to_owned().to_string(),
566 ));
567 }
568 }
569 let record = Record {
570 hash: ObjectId(Hash::from([0; 32])),
571 message: String::from(message.as_ref()),
572 author: String::from(author.as_ref()),
573 date: chrono::Utc::now(),
574 root: tree.0.hash,
575 };
576 let hash = blake3::hash(serde_json::to_vec(&record)?.as_slice());
577 let record = Record {
578 hash: ObjectId(hash),
579 ..record
580 };
581 self.store_record(&record).await?;
583 write(self.path.join("HEAD"), hash.to_hex().to_string()).await?;
584 Ok(record)
585 }
586
587 pub async fn read_record(&self, record_hash: &ObjectId) -> Result<Record, WsvcFsError> {
589 let record_path = self
590 .records_dir()
591 .await?
592 .join(record_hash.0.to_hex().as_str());
593 let result = serde_json::from_slice::<Record>(&tokio::fs::read(record_path).await?)?;
594 Ok(result)
595 }
596
597 pub async fn checkout_record(
599 &self,
600 record_hash: &ObjectId,
601 workspace: &Path,
602 ) -> Result<Record, WsvcFsError> {
603 let record = self.read_record(record_hash).await?;
604 self.checkout_tree(&self.read_tree(&record.root).await?, workspace)
605 .await?;
606 write(self.path.join("HEAD"), record_hash.0.to_hex().to_string()).await?;
608 remove_dir_all(self.temp_dir().await?).await?;
609 Ok(record)
610 }
611
612 pub async fn get_records(&self) -> Result<Vec<Record>, WsvcFsError> {
614 let mut result = Vec::new();
615 let mut entries = read_dir(self.records_dir().await?).await?;
616 while let Some(entry) = entries.next_entry().await? {
617 let entry_type = entry.file_type().await?;
618 if entry_type.is_file() {
619 result.push(
620 self.read_record(&ObjectId(Hash::from_hex(
621 entry
622 .file_name()
623 .to_str()
624 .ok_or(WsvcFsError::InvalidOsString(format!("{:?}", entry)))?,
625 )?))
626 .await?,
627 );
628 }
629 }
630 Ok(result)
631 }
632
633 pub async fn get_trees_of_record(
635 &self,
636 record_hash: &ObjectId,
637 ) -> Result<Vec<Tree>, WsvcFsError> {
638 let record = self.read_record(record_hash).await?;
639 let mut result = Vec::new();
640 let mut queue = vec![record.root];
641 while let Some(tree_hash) = queue.pop() {
642 let tree = self.read_tree(&tree_hash).await?;
643 result.push(tree.clone());
644 for tree_hash in tree.trees {
645 queue.push(tree_hash);
646 }
647 }
648 Ok(result)
649 }
650
651 pub async fn get_blobs_of_tree(&self, tree_hash: &ObjectId) -> Result<Vec<Blob>, WsvcFsError> {
652 let tree = self.read_tree(tree_hash).await?;
653 let mut result = tree.blobs.clone();
654 let mut queue = tree.trees;
655 while let Some(tree_hash) = queue.pop() {
656 let tree = self.read_tree(&tree_hash).await?;
657 result.extend(tree.blobs.clone());
658 for tree_hash in tree.trees {
659 queue.push(tree_hash);
660 }
661 }
662 Ok(result)
663 }
664
665 pub async fn get_latest_record(&self) -> Result<Option<Record>, WsvcFsError> {
667 let mut records = self.get_records().await?;
668 if records.is_empty() {
669 return Ok(None);
670 }
671 records.sort_by(|a, b| b.date.cmp(&a.date));
672 Ok(Some(records[0].clone()))
673 }
674
675 pub async fn get_head_record(&self) -> Result<Option<Record>, WsvcFsError> {
677 let head_hash = read(self.path.join("HEAD")).await?;
678 if String::from_utf8(head_hash.clone())
679 .map_err(|err| WsvcFsError::InvalidOsString(format!("{:?}", err)))?
680 == *""
681 {
682 return Ok(None);
683 }
684 Ok(Some(
685 self.read_record(
686 &String::from_utf8(head_hash)
687 .map_err(|err| WsvcFsError::InvalidOsString(format!("{:?}", err)))?
688 .try_into()?,
689 )
690 .await?,
691 ))
692 }
693
694 pub async fn write_origin(&self, url: String) -> Result<(), WsvcFsError> {
695 write(self.path.join("ORIGIN"), url)
697 .await
698 .map_err(WsvcFsError::Os)
699 }
700
701 pub async fn read_origin(&self) -> Result<String, WsvcFsError> {
702 tokio::fs::read_to_string(self.path.join("ORIGIN"))
704 .await
705 .map_err(|_| WsvcFsError::RemoteNotSet)
706 }
707}
708
709impl Blob {
710 pub async fn checksum(&self, rel_path: impl AsRef<Path>) -> Result<bool, WsvcFsError> {
712 let mut file = File::open(rel_path).await?;
713 let mut buffer: [u8; 16384] = [0; 16384];
714 let mut hasher = blake3::Hasher::new();
715 loop {
716 let n = file.read(&mut buffer).await?;
717 if n == 0 {
718 break;
719 }
720 hasher.update(&buffer[..n]);
721 }
722 Ok(hasher.finalize() == self.hash.0)
723 }
724}