1use std::collections::BTreeMap;
2use std::collections::HashSet;
3use std::path::Path;
4use std::path::PathBuf;
5
6use tokio_stream::StreamExt;
7use tracing::debug;
8use tracing::info;
9use url::Url;
10
11use crate::Error;
12use crate::Res;
13use crate::error::ManifestError;
14use crate::error::PackageOpError;
15use crate::io::manifest::RowsStream;
16use crate::io::manifest::StreamRowsChunk;
17use crate::io::manifest::build_manifest_from_rows_stream;
18use crate::io::storage::Storage;
19use crate::lineage::Change;
20use crate::lineage::CommitState;
21use crate::lineage::InstalledPackageStatus;
22use crate::lineage::PackageLineage;
23use crate::lineage::PathState;
24use crate::manifest::Manifest;
25use crate::manifest::ManifestHeader;
26use crate::manifest::ManifestRow;
27use crate::manifest::Workflow;
28use crate::paths::DomainPaths;
29use quilt_uri::Namespace;
30
31async fn stream_local_with_changes(
32 local_manifest: &Manifest,
33 removed: HashSet<PathBuf>,
34 modified: BTreeMap<PathBuf, ManifestRow>,
35 new_files: StreamRowsChunk,
36) -> impl RowsStream {
37 let mut all_rows: Vec<Res<ManifestRow>> = Vec::new();
39
40 all_rows.extend(new_files);
42
43 let mut stream = local_manifest.records_stream().await;
45 while let Some(chunk_result) = stream.next().await {
46 if let Ok(chunk) = chunk_result {
47 for row_res in chunk {
48 match row_res {
49 Ok(row) => {
50 if removed.contains(&row.logical_key) {
52 continue;
53 }
54
55 if let Some(modified_row) = modified.get(&row.logical_key) {
57 all_rows.push(Ok(modified_row.clone()));
58 } else {
59 all_rows.push(Ok(row));
60 }
61 }
62 Err(err) => {
63 all_rows.push(Err(Error::Manifest(ManifestError::Table(err.to_string()))));
64 }
65 }
66 }
67 }
68 }
69
70 all_rows.sort_by(|a, b| match (a, b) {
72 (Ok(row_a), Ok(row_b)) => row_a.logical_key.cmp(&row_b.logical_key),
73 (Ok(_), Err(_)) => std::cmp::Ordering::Less,
74 (Err(_), Ok(_)) => std::cmp::Ordering::Greater,
75 (Err(_), Err(_)) => std::cmp::Ordering::Equal,
76 });
77
78 tokio_stream::iter(vec![Ok(all_rows)])
80}
81
82async fn create_immutable_object_copy(
83 storage: &impl Storage,
84 paths: &DomainPaths,
85 working_dir: &Path,
86 lineage: &mut PackageLineage,
87 logical_key: &PathBuf,
88 current: ManifestRow,
89) -> Res<ManifestRow> {
90 debug!(
91 "⏳ Creating immutable object copy for: {}",
92 logical_key.display()
93 );
94 let objects_dir = paths.objects_dir();
95 let object_dest = objects_dir.join(hex::encode(current.hash.digest()));
96 let new_physical_key = Url::from_file_path(&object_dest)
97 .map_err(|()| {
98 Error::PackageOp(PackageOpError::Commit(format!(
99 "Failed to create URL from {}",
100 object_dest.display()
101 )))
102 })?
103 .to_string();
104
105 let current_hash = current.hash.clone();
106 let row = ManifestRow {
107 logical_key: logical_key.clone(),
108 physical_key: new_physical_key,
109 ..current
110 };
111
112 let work_dest = working_dir.join(logical_key);
113
114 if storage.exists(&object_dest).await {
115 debug!(
116 "✔️ Object already exists in storage: {}",
117 object_dest.display()
118 );
119 } else {
120 debug!(
121 "⏳ Copying file to objects directory: {}",
122 object_dest.display()
123 );
124 storage.copy(&work_dest, object_dest).await?;
125 debug!("✔️ File copied successfully");
126 }
127 lineage.paths.insert(
128 logical_key.clone(),
129 PathState {
130 timestamp: storage.modified_timestamp(&work_dest).await?,
131 hash: current_hash.into(),
132 },
133 );
134 Ok(row)
135}
136
137#[allow(clippy::too_many_arguments)]
151pub async fn commit_package(
152 mut lineage: PackageLineage,
153 manifest: &mut Manifest,
154 paths: &DomainPaths,
155 storage: &(impl Storage + Sync),
156 working_dir: PathBuf,
157 status: InstalledPackageStatus,
158 namespace: Namespace,
159 message: String,
160 user_meta: Option<serde_json::Value>,
161 workflow: Option<Workflow>,
162) -> Res<(PackageLineage, CommitState)> {
163 info!(
164 r#"⏳ Starting commit with message "{}" and user_meta `{:?}`"#,
165 message, user_meta
166 );
167
168 let mut modified_keys = BTreeMap::new();
203 let mut removed_keys = HashSet::new();
204 let mut new_files = Vec::new();
205 for (logical_key, state) in status.changes {
206 debug!(
207 "Processing change type {:?} for: {}",
208 state,
209 logical_key.display()
210 );
211 match state {
212 Change::Removed(row) => {
213 lineage.paths.remove(&row.logical_key);
214 removed_keys.insert(row.logical_key);
215 }
216 Change::Added(current) => {
217 if manifest.contains_record(¤t.logical_key) {
218 return Err(Error::PackageOp(PackageOpError::Commit(format!(
219 "Trying to add a file that is already in the manifest: \"{}\"",
220 current.logical_key.display()
221 ))));
222 }
223 let added = create_immutable_object_copy(
224 storage,
225 paths,
226 &working_dir,
227 &mut lineage,
228 &logical_key,
229 current,
230 )
231 .await?;
232 new_files.push(Ok(added));
233 }
234 Change::Modified(current) => {
235 let modified = create_immutable_object_copy(
236 storage,
237 paths,
238 &working_dir,
239 &mut lineage,
240 &logical_key,
241 current,
242 )
243 .await?;
244 modified_keys.insert(logical_key.clone(), modified);
245 }
246 }
247 }
248
249 let processed_user_meta = match user_meta {
250 Some(serde_json::Value::Object(mut m)) => {
251 m.sort_keys();
252 Some(m.into())
253 }
254 other => other,
255 };
256
257 let header = ManifestHeader {
258 message: Some(message.clone()),
259 workflow,
260 user_meta: processed_user_meta,
261 ..ManifestHeader::default()
262 };
263
264 debug!(
265 "⏳ Building new manifest with {} removed, {} modified, {} new files",
266 removed_keys.len(),
267 modified_keys.len(),
268 new_files.len()
269 );
270 let stream = stream_local_with_changes(manifest, removed_keys, modified_keys, new_files).await;
271 let dest_dir = paths.installed_manifests_dir(&namespace);
272 let (manifest_path, new_top_hash) =
273 build_manifest_from_rows_stream(storage, dest_dir, header, stream).await?;
274 info!(
275 "✔️New manifest with {} was built in {}",
276 manifest_path.display(),
277 new_top_hash
278 );
279 let mut prev_hashes = Vec::new();
280 if let Some(commit) = lineage.commit {
281 prev_hashes.push(commit.hash.clone());
282 prev_hashes.extend(commit.prev_hashes.clone());
283 }
284 let commit = CommitState {
285 hash: new_top_hash,
286 timestamp: chrono::Utc::now(),
287 prev_hashes,
288 };
289 lineage.commit = Some(commit.clone());
290
291 info!(
292 "✔️ Successfully committed changes with hash: {}",
293 commit.hash
294 );
295 Ok((lineage, commit))
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use test_log::test;
302
303 use std::collections::BTreeMap;
304
305 use aws_sdk_s3::primitives::ByteStream;
306
307 use crate::fixtures;
308 use crate::io::storage::mocks::MockStorage;
309 use crate::lineage::Change;
310
311 #[test(tokio::test)]
314 async fn test_commit_empty() -> Res {
315 let storage = MockStorage::default();
316 let lineage = PackageLineage::default();
317 assert!(lineage.commit.is_none());
318 let (_lineage, commit) = commit_package(
319 lineage,
320 &mut Manifest::default(),
321 &DomainPaths::default(),
322 &storage,
323 PathBuf::default(),
324 InstalledPackageStatus::default(),
325 ("foo", "bar").into(),
326 String::default(),
327 None,
328 None,
329 )
330 .await?;
331 let hash = fixtures::top_hash::EMPTY_NONE_TOP_HASH;
332 assert!(
333 storage
334 .exists(&PathBuf::from(format!(".quilt/installed/foo/bar/{hash}")))
335 .await
336 );
337 assert_eq!(commit.hash, hash);
338 Ok(())
339 }
340
341 #[test(tokio::test)]
342 async fn test_commit_meta() -> Res {
343 let storage = MockStorage::default();
344
345 let commit_message = "Lorem ipsum".to_string();
346 let mut user_meta = serde_json::Map::new();
347 user_meta.insert(
348 "lorem".to_string(),
349 serde_json::Value::String("ipsum".to_string()),
350 );
351
352 let lineage = PackageLineage::default();
353 assert!(lineage.commit.is_none());
354 let (_lineage, commit) = commit_package(
355 lineage,
356 &mut Manifest::default(),
357 &DomainPaths::default(),
358 &storage,
359 PathBuf::default(),
360 InstalledPackageStatus::default(),
361 ("foo", "bar").into(),
362 commit_message,
363 Some(serde_json::Value::Object(user_meta)),
364 None,
365 )
366 .await?;
367 let hash = "56c329d2390c9c6efedb698f47b75f096112c89a7751d55a426507ec6c432897";
368 assert!(
369 storage
370 .exists(&PathBuf::from(format!(".quilt/installed/foo/bar/{hash}")))
371 .await
372 );
373 assert_eq!(commit.hash, hash);
374 Ok(())
375 }
376
377 #[test(tokio::test)]
378 async fn test_removing_and_commit() -> Res {
379 let storage = MockStorage::default();
380
381 let status = InstalledPackageStatus {
382 changes: BTreeMap::from([(
383 PathBuf::from("one/two two/three three three/READ ME.md"),
384 Change::Removed(ManifestRow {
385 logical_key: PathBuf::from("one/two two/three three three/READ ME.md"),
386 ..ManifestRow::default()
387 }),
388 )]),
389 ..InstalledPackageStatus::default()
390 };
391
392 let lineage = PackageLineage {
393 paths: BTreeMap::from([(
394 PathBuf::from("one/two two/three three three/READ ME.md"),
395 PathState::default(),
396 )]),
397 ..PackageLineage::default()
398 };
399 let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
400
401 assert!(
402 lineage.commit.is_none(),
403 "Initial lineage has commit already"
404 );
405 assert!(
406 lineage
407 .paths
408 .contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
409 "Initial lineage doesn't have testing path"
410 );
411
412 let (lineage, commit) = commit_package(
413 lineage,
414 &mut manifest,
415 &DomainPaths::default(),
416 &storage,
417 PathBuf::default(),
418 status,
419 ("foo", "bar").into(),
420 String::from("Initial"),
421 Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
422 None,
423 )
424 .await?;
425
426 let hash = "22590f2254e00b12f0c141117969172e925d6b8e9af26a04fa35658f1ad4e04c";
427 assert!(
428 !lineage
429 .paths
430 .contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
431 "Commited lineage still has a path, that should be clear after commit"
432 );
433 assert!(
434 storage
435 .exists(&PathBuf::from(format!(".quilt/installed/foo/bar/{hash}")))
436 .await,
437 "Registry doesn't have installed package with a new hash"
438 );
439 assert_eq!(commit.hash, hash);
440
441 Ok(())
442 }
443
444 #[test(tokio::test)]
445 async fn test_adding_and_commit() -> Res {
446 let manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
447 let base_record = manifest.get_record(&PathBuf::from("0mb.bin")).unwrap();
448 let added_file = ManifestRow {
449 logical_key: PathBuf::from("foo"),
450 hash: base_record.hash.clone(),
451 size: base_record.size,
452 physical_key: base_record.physical_key.clone(),
453 ..ManifestRow::default()
454 };
455
456 let storage = MockStorage::default();
457 storage
458 .write_byte_stream(PathBuf::from("/working-dir/foo"), ByteStream::default())
459 .await?;
460
461 let status = InstalledPackageStatus {
462 changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added_file.clone()))]),
463 ..InstalledPackageStatus::default()
464 };
465
466 let lineage = PackageLineage::default();
467 let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
468
469 assert!(
470 lineage.commit.is_none(),
471 "Initial lineage has commit already"
472 );
473 assert!(
474 !lineage.paths.contains_key(&PathBuf::from("foo")),
475 "Initial lineage has path, but shouldn't because we test _new_ file"
476 );
477
478 let (lineage, commit) = commit_package(
479 lineage,
480 &mut manifest,
481 &DomainPaths::new(PathBuf::from("/")),
482 &storage,
483 PathBuf::from("/working-dir"),
484 status,
485 ("foo", "bar").into(),
486 String::from("Initial"),
487 Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
488 None,
489 )
490 .await?;
491
492 let hash = fixtures::objects::ZERO_HASH_HEX;
493 assert!(
494 lineage.paths.contains_key(&PathBuf::from("foo")),
495 "Commited lineage doesn't have path, but should have. We added new file and it should be there."
496 );
497 assert!(
498 storage
499 .exists(&PathBuf::from(format!("/.quilt/objects/{hash}")))
500 .await,
501 "Registry doesn't have installed path"
502 );
503 assert_eq!(
504 commit.hash,
505 "e8fc7ccb96e87acd4ca02123e0c658ad92cdb2cc2822103d4f5bac79254cca08"
506 );
507
508 Ok(())
509 }
510
511 #[test(tokio::test)]
516 async fn test_adding_manifest_already_has_it() -> Res {
517 let manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
518 let base_record = manifest
519 .get_record(&PathBuf::from("one/two two/three three three/READ ME.md"))
520 .unwrap();
521 let added_file = ManifestRow {
522 logical_key: PathBuf::from("one/two two/three three three/READ ME.md"),
523 hash: base_record.hash.clone(),
524 size: base_record.size,
525 physical_key: base_record.physical_key.clone(),
526 ..ManifestRow::default()
527 };
528 let hash = added_file.hash.clone();
529
530 let storage = MockStorage::default();
531 storage
532 .write_byte_stream(
533 PathBuf::from("one/two two/three three three/READ ME.md"),
534 ByteStream::from_static(b"This is the README."),
535 )
536 .await?;
537 storage
538 .write_byte_stream(
539 PathBuf::from(format!(".quilt/objects/{}", hex::encode(hash.digest()))),
540 ByteStream::from_static(b"This is the README."),
541 )
542 .await?;
543
544 let status = InstalledPackageStatus {
545 changes: BTreeMap::from([(
546 PathBuf::from("one/two two/three three three/READ ME.md"),
547 Change::Added(added_file.clone()),
548 )]),
549 ..InstalledPackageStatus::default()
550 };
551
552 let lineage = PackageLineage {
553 paths: BTreeMap::from([(
554 PathBuf::from("one/two two/three three three/READ ME.md"),
555 PathState::default(),
556 )]),
557 ..PackageLineage::default()
558 };
559 let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
560
561 let result = commit_package(
562 lineage,
563 &mut manifest,
564 &DomainPaths::new(PathBuf::from("/")),
565 &storage,
566 PathBuf::default(),
567 status,
568 ("foo", "bar").into(),
569 String::from("Initial"),
570 Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
571 None,
572 )
573 .await;
574
575 assert_eq!(
576 result.unwrap_err().to_string(),
577 "Commit error: Trying to add a file that is already in the manifest: \"one/two two/three three three/READ ME.md\""
578 );
579
580 Ok(())
581 }
582
583 #[test(tokio::test)]
584 async fn test_modifying_and_commit() -> Res {
585 let storage = MockStorage::default();
586 storage
587 .write_byte_stream(
588 PathBuf::from("/working-dir/one/two two/three three three/READ ME.md"),
589 ByteStream::from_static(fixtures::objects::less_than_8mb()),
590 )
591 .await?;
592
593 let manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
594 let base_record = manifest
595 .get_record(&PathBuf::from("less-then-8mb.txt"))
596 .unwrap();
597 let modified_file = ManifestRow {
598 logical_key: PathBuf::from("one/two two/three three three/READ ME.md"),
599 hash: base_record.hash.clone(),
600 size: base_record.size,
601 physical_key: base_record.physical_key.clone(),
602 ..ManifestRow::default()
603 };
604 let status = InstalledPackageStatus {
605 changes: BTreeMap::from([(
606 PathBuf::from("one/two two/three three three/READ ME.md"),
607 Change::Modified(modified_file),
608 )]),
609 ..InstalledPackageStatus::default()
610 };
611
612 let lineage = PackageLineage {
613 paths: BTreeMap::from([(
614 PathBuf::from("one/two two/three three three/READ ME.md"),
615 PathState::default(),
616 )]),
617 ..PackageLineage::default()
618 };
619 let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
620
621 assert!(
622 lineage.commit.is_none(),
623 "Initial lineage has commit already"
624 );
625 assert!(
626 lineage
627 .paths
628 .contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
629 "Initial lineage doesn't have path, but should because we test installed and modified file"
630 );
631
632 let (lineage, commit) = commit_package(
633 lineage,
634 &mut manifest,
635 &DomainPaths::new(PathBuf::from("/")),
636 &storage,
637 PathBuf::from("/working-dir"),
638 status,
639 ("foo", "bar").into(),
640 String::from("Initial"),
641 Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
642 None,
643 )
644 .await?;
645
646 assert!(
647 lineage
648 .paths
649 .contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
650 "Commited lineage doesn't have path, but should have. We added new file and it should be there."
651 );
652 assert!(
653 storage
654 .exists(&PathBuf::from(format!(
655 "/.quilt/objects/{}",
656 fixtures::objects::LESS_THAN_8MB_HASH_HEX
657 )))
658 .await,
659 "Registry doesn't have installed path"
660 );
661 assert_eq!(
662 commit.hash,
663 "39bbc9a95f787cd938fb5830abe5e25408f0aac4000528b8717130be5f7bc2b3"
664 );
665
666 Ok(())
667 }
668}