Skip to main content

quilt_rs/flow/
commit.rs

1use std::collections::BTreeMap;
2use std::collections::HashSet;
3use std::path::Path;
4use std::path::PathBuf;
5
6use tokio_stream::StreamExt;
7use tracing::debug;
8use tracing::info;
9use url::Url;
10
11use crate::Error;
12use crate::Res;
13use crate::error::ManifestError;
14use crate::error::PackageOpError;
15use crate::io::manifest::RowsStream;
16use crate::io::manifest::StreamRowsChunk;
17use crate::io::manifest::build_manifest_from_rows_stream;
18use crate::io::storage::Storage;
19use crate::lineage::Change;
20use crate::lineage::CommitState;
21use crate::lineage::InstalledPackageStatus;
22use crate::lineage::PackageLineage;
23use crate::lineage::PathState;
24use crate::manifest::Manifest;
25use crate::manifest::ManifestHeader;
26use crate::manifest::ManifestRow;
27use crate::manifest::Workflow;
28use crate::paths::DomainPaths;
29use quilt_uri::Namespace;
30
31async fn stream_local_with_changes(
32    local_manifest: &Manifest,
33    removed: HashSet<PathBuf>,
34    modified: BTreeMap<PathBuf, ManifestRow>,
35    new_files: StreamRowsChunk,
36) -> impl RowsStream {
37    // Collect all rows from the local manifest stream
38    let mut all_rows: Vec<Res<ManifestRow>> = Vec::new();
39
40    // Add new files to the collection
41    all_rows.extend(new_files);
42
43    // Process and add existing rows from the manifest
44    let mut stream = local_manifest.records_stream().await;
45    while let Some(chunk_result) = stream.next().await {
46        if let Ok(chunk) = chunk_result {
47            for row_res in chunk {
48                match row_res {
49                    Ok(row) => {
50                        // Skip removed rows
51                        if removed.contains(&row.logical_key) {
52                            continue;
53                        }
54
55                        // Use modified version if available, otherwise use original
56                        if let Some(modified_row) = modified.get(&row.logical_key) {
57                            all_rows.push(Ok(modified_row.clone()));
58                        } else {
59                            all_rows.push(Ok(row));
60                        }
61                    }
62                    Err(err) => {
63                        all_rows.push(Err(Error::Manifest(ManifestError::Table(err.to_string()))));
64                    }
65                }
66            }
67        }
68    }
69
70    // Sort all rows by name
71    all_rows.sort_by(|a, b| match (a, b) {
72        (Ok(row_a), Ok(row_b)) => row_a.logical_key.cmp(&row_b.logical_key),
73        (Ok(_), Err(_)) => std::cmp::Ordering::Less,
74        (Err(_), Ok(_)) => std::cmp::Ordering::Greater,
75        (Err(_), Err(_)) => std::cmp::Ordering::Equal,
76    });
77
78    // Convert back to a stream
79    tokio_stream::iter(vec![Ok(all_rows)])
80}
81
82async fn create_immutable_object_copy(
83    storage: &impl Storage,
84    paths: &DomainPaths,
85    working_dir: &Path,
86    lineage: &mut PackageLineage,
87    logical_key: &PathBuf,
88    current: ManifestRow,
89) -> Res<ManifestRow> {
90    debug!(
91        "⏳ Creating immutable object copy for: {}",
92        logical_key.display()
93    );
94    let objects_dir = paths.objects_dir();
95    let object_dest = objects_dir.join(hex::encode(current.hash.digest()));
96    let new_physical_key = Url::from_file_path(&object_dest)
97        .map_err(|()| {
98            Error::PackageOp(PackageOpError::Commit(format!(
99                "Failed to create URL from {}",
100                object_dest.display()
101            )))
102        })?
103        .to_string();
104
105    let current_hash = current.hash.clone();
106    let row = ManifestRow {
107        logical_key: logical_key.clone(),
108        physical_key: new_physical_key,
109        ..current
110    };
111
112    let work_dest = working_dir.join(logical_key);
113
114    if storage.exists(&object_dest).await {
115        debug!(
116            "✔️ Object already exists in storage: {}",
117            object_dest.display()
118        );
119    } else {
120        debug!(
121            "⏳ Copying file to objects directory: {}",
122            object_dest.display()
123        );
124        storage.copy(&work_dest, object_dest).await?;
125        debug!("✔️ File copied successfully");
126    }
127    lineage.paths.insert(
128        logical_key.clone(),
129        PathState {
130            timestamp: storage.modified_timestamp(&work_dest).await?,
131            hash: current_hash.into(),
132        },
133    );
134    Ok(row)
135}
136
137// TODO
138// pub struct Commit {
139//[     message: Option<String>,
140//     user_meta: Option<serde_json::Value>,
141//     workflow: Option<Workflow>,
142// }
143
144/// Commit new commit with new `message`, `user_meta` and all changes got from calling `flow::status`
145///
146/// On `Ok`, the returned `CommitState` is also stored in `lineage.commit` —
147/// callers that need the new top hash should read it from the tuple rather
148/// than unwrapping `lineage.commit`.
149// TODO: move `working_dir` to `paths`, and `paths` to `storage`
150#[allow(clippy::too_many_arguments)]
151pub async fn commit_package(
152    mut lineage: PackageLineage,
153    manifest: &mut Manifest,
154    paths: &DomainPaths,
155    storage: &(impl Storage + Sync),
156    working_dir: PathBuf,
157    status: InstalledPackageStatus,
158    namespace: Namespace,
159    message: String,
160    user_meta: Option<serde_json::Value>,
161    workflow: Option<Workflow>,
162) -> Res<(PackageLineage, CommitState)> {
163    info!(
164        r#"⏳ Starting commit with message "{}" and user_meta `{:?}`"#,
165        message, user_meta
166    );
167
168    // create a new manifest based on the stored version
169
170    // for each modified file:
171    //   - compute the new hash
172    //   - store in the identity cache at $LOCAL/.quilt/objects/<hash>
173    //   - update the modified entries in the manifest with the new physical keys
174    //     pointing to the new objects in the identity cache
175    //   - ? set entry.meta.pulled_hashes to previous object hash?
176    //   - ? set entry.meta.remote_key to the remote's physical key?
177
178    // compute the new top hash
179    // store the new manifest under the new top hash at $LOCAL/.quilt/packages/<hash>
180    // XXX: prefix with the namespace?
181    // XXX: what to do on collisions?
182    //      e.g. when a file was changed, committed, and then reverted
183
184    // store revision pointers to the newly created manifest
185    //   - in the local registry??
186    //   - in the lineage
187    //     - commit:
188    //       - timestamp
189    //       - user ?
190    //       - multihash: new_top_hash
191    //       - pulled_hashes: [old_top_hash] ?
192    //       - paths:
193    //         - [modified file's path]:
194    //           - multihash
195    //           # XXX: do we actually need this? can be inferred from namespace + logical key
196    //           - remote_key: "s3://..." # no version id
197    //           - local_key: $LOCAL/.quilt/objects/<hash>
198    //           - pulled_hashes: [old_hash] ?
199    // NOTE: each commit MUST include all paths from prior commits
200    //       (since the last pull, until reset by a sync)
201
202    let mut modified_keys = BTreeMap::new();
203    let mut removed_keys = HashSet::new();
204    let mut new_files = Vec::new();
205    for (logical_key, state) in status.changes {
206        debug!(
207            "Processing change type {:?} for: {}",
208            state,
209            logical_key.display()
210        );
211        match state {
212            Change::Removed(row) => {
213                lineage.paths.remove(&row.logical_key);
214                removed_keys.insert(row.logical_key);
215            }
216            Change::Added(current) => {
217                if manifest.contains_record(&current.logical_key) {
218                    return Err(Error::PackageOp(PackageOpError::Commit(format!(
219                        "Trying to add a file that is already in the manifest: \"{}\"",
220                        current.logical_key.display()
221                    ))));
222                }
223                let added = create_immutable_object_copy(
224                    storage,
225                    paths,
226                    &working_dir,
227                    &mut lineage,
228                    &logical_key,
229                    current,
230                )
231                .await?;
232                new_files.push(Ok(added));
233            }
234            Change::Modified(current) => {
235                let modified = create_immutable_object_copy(
236                    storage,
237                    paths,
238                    &working_dir,
239                    &mut lineage,
240                    &logical_key,
241                    current,
242                )
243                .await?;
244                modified_keys.insert(logical_key.clone(), modified);
245            }
246        }
247    }
248
249    let processed_user_meta = match user_meta {
250        Some(serde_json::Value::Object(mut m)) => {
251            m.sort_keys();
252            Some(m.into())
253        }
254        other => other,
255    };
256
257    let header = ManifestHeader {
258        message: Some(message.clone()),
259        workflow,
260        user_meta: processed_user_meta,
261        ..ManifestHeader::default()
262    };
263
264    debug!(
265        "⏳ Building new manifest with {} removed, {} modified, {} new files",
266        removed_keys.len(),
267        modified_keys.len(),
268        new_files.len()
269    );
270    let stream = stream_local_with_changes(manifest, removed_keys, modified_keys, new_files).await;
271    let dest_dir = paths.installed_manifests_dir(&namespace);
272    let (manifest_path, new_top_hash) =
273        build_manifest_from_rows_stream(storage, dest_dir, header, stream).await?;
274    info!(
275        "✔️New manifest with {} was built in {}",
276        manifest_path.display(),
277        new_top_hash
278    );
279    let mut prev_hashes = Vec::new();
280    if let Some(commit) = lineage.commit {
281        prev_hashes.push(commit.hash.clone());
282        prev_hashes.extend(commit.prev_hashes.clone());
283    }
284    let commit = CommitState {
285        hash: new_top_hash,
286        timestamp: chrono::Utc::now(),
287        prev_hashes,
288    };
289    lineage.commit = Some(commit.clone());
290
291    info!(
292        "✔️ Successfully committed changes with hash: {}",
293        commit.hash
294    );
295    Ok((lineage, commit))
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use test_log::test;
302
303    use std::collections::BTreeMap;
304
305    use aws_sdk_s3::primitives::ByteStream;
306
307    use crate::fixtures;
308    use crate::io::storage::mocks::MockStorage;
309    use crate::lineage::Change;
310
311    // NOTE: Tests use "/" path for working directory, because it then parsed with Url and have to be absolute path
312
313    #[test(tokio::test)]
314    async fn test_commit_empty() -> Res {
315        let storage = MockStorage::default();
316        let lineage = PackageLineage::default();
317        assert!(lineage.commit.is_none());
318        let (_lineage, commit) = commit_package(
319            lineage,
320            &mut Manifest::default(),
321            &DomainPaths::default(),
322            &storage,
323            PathBuf::default(),
324            InstalledPackageStatus::default(),
325            ("foo", "bar").into(),
326            String::default(),
327            None,
328            None,
329        )
330        .await?;
331        let hash = fixtures::top_hash::EMPTY_NONE_TOP_HASH;
332        assert!(
333            storage
334                .exists(&PathBuf::from(format!(".quilt/installed/foo/bar/{hash}")))
335                .await
336        );
337        assert_eq!(commit.hash, hash);
338        Ok(())
339    }
340
341    #[test(tokio::test)]
342    async fn test_commit_meta() -> Res {
343        let storage = MockStorage::default();
344
345        let commit_message = "Lorem ipsum".to_string();
346        let mut user_meta = serde_json::Map::new();
347        user_meta.insert(
348            "lorem".to_string(),
349            serde_json::Value::String("ipsum".to_string()),
350        );
351
352        let lineage = PackageLineage::default();
353        assert!(lineage.commit.is_none());
354        let (_lineage, commit) = commit_package(
355            lineage,
356            &mut Manifest::default(),
357            &DomainPaths::default(),
358            &storage,
359            PathBuf::default(),
360            InstalledPackageStatus::default(),
361            ("foo", "bar").into(),
362            commit_message,
363            Some(serde_json::Value::Object(user_meta)),
364            None,
365        )
366        .await?;
367        let hash = "56c329d2390c9c6efedb698f47b75f096112c89a7751d55a426507ec6c432897";
368        assert!(
369            storage
370                .exists(&PathBuf::from(format!(".quilt/installed/foo/bar/{hash}")))
371                .await
372        );
373        assert_eq!(commit.hash, hash);
374        Ok(())
375    }
376
377    #[test(tokio::test)]
378    async fn test_removing_and_commit() -> Res {
379        let storage = MockStorage::default();
380
381        let status = InstalledPackageStatus {
382            changes: BTreeMap::from([(
383                PathBuf::from("one/two two/three three three/READ ME.md"),
384                Change::Removed(ManifestRow {
385                    logical_key: PathBuf::from("one/two two/three three three/READ ME.md"),
386                    ..ManifestRow::default()
387                }),
388            )]),
389            ..InstalledPackageStatus::default()
390        };
391
392        let lineage = PackageLineage {
393            paths: BTreeMap::from([(
394                PathBuf::from("one/two two/three three three/READ ME.md"),
395                PathState::default(),
396            )]),
397            ..PackageLineage::default()
398        };
399        let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
400
401        assert!(
402            lineage.commit.is_none(),
403            "Initial lineage has commit already"
404        );
405        assert!(
406            lineage
407                .paths
408                .contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
409            "Initial lineage doesn't have testing path"
410        );
411
412        let (lineage, commit) = commit_package(
413            lineage,
414            &mut manifest,
415            &DomainPaths::default(),
416            &storage,
417            PathBuf::default(),
418            status,
419            ("foo", "bar").into(),
420            String::from("Initial"),
421            Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
422            None,
423        )
424        .await?;
425
426        let hash = "22590f2254e00b12f0c141117969172e925d6b8e9af26a04fa35658f1ad4e04c";
427        assert!(
428            !lineage
429                .paths
430                .contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
431            "Commited lineage still has a path, that should be clear after commit"
432        );
433        assert!(
434            storage
435                .exists(&PathBuf::from(format!(".quilt/installed/foo/bar/{hash}")))
436                .await,
437            "Registry doesn't have installed package with a new hash"
438        );
439        assert_eq!(commit.hash, hash);
440
441        Ok(())
442    }
443
444    #[test(tokio::test)]
445    async fn test_adding_and_commit() -> Res {
446        let manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
447        let base_record = manifest.get_record(&PathBuf::from("0mb.bin")).unwrap();
448        let added_file = ManifestRow {
449            logical_key: PathBuf::from("foo"),
450            hash: base_record.hash.clone(),
451            size: base_record.size,
452            physical_key: base_record.physical_key.clone(),
453            ..ManifestRow::default()
454        };
455
456        let storage = MockStorage::default();
457        storage
458            .write_byte_stream(PathBuf::from("/working-dir/foo"), ByteStream::default())
459            .await?;
460
461        let status = InstalledPackageStatus {
462            changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added_file.clone()))]),
463            ..InstalledPackageStatus::default()
464        };
465
466        let lineage = PackageLineage::default();
467        let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
468
469        assert!(
470            lineage.commit.is_none(),
471            "Initial lineage has commit already"
472        );
473        assert!(
474            !lineage.paths.contains_key(&PathBuf::from("foo")),
475            "Initial lineage has path, but shouldn't because we test _new_ file"
476        );
477
478        let (lineage, commit) = commit_package(
479            lineage,
480            &mut manifest,
481            &DomainPaths::new(PathBuf::from("/")),
482            &storage,
483            PathBuf::from("/working-dir"),
484            status,
485            ("foo", "bar").into(),
486            String::from("Initial"),
487            Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
488            None,
489        )
490        .await?;
491
492        let hash = fixtures::objects::ZERO_HASH_HEX;
493        assert!(
494            lineage.paths.contains_key(&PathBuf::from("foo")),
495            "Commited lineage doesn't have path, but should have. We added new file and it should be there."
496        );
497        assert!(
498            storage
499                .exists(&PathBuf::from(format!("/.quilt/objects/{hash}")))
500                .await,
501            "Registry doesn't have installed path"
502        );
503        assert_eq!(
504            commit.hash,
505            "e8fc7ccb96e87acd4ca02123e0c658ad92cdb2cc2822103d4f5bac79254cca08"
506        );
507
508        Ok(())
509    }
510
511    // It is no longer reproducible in tests
512    // and I doubt it ever could be reproducible at all
513    // TODO: anyway it makes sense to add some sanity checks
514    //       even for imposible states
515    #[test(tokio::test)]
516    async fn test_adding_manifest_already_has_it() -> Res {
517        let manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
518        let base_record = manifest
519            .get_record(&PathBuf::from("one/two two/three three three/READ ME.md"))
520            .unwrap();
521        let added_file = ManifestRow {
522            logical_key: PathBuf::from("one/two two/three three three/READ ME.md"),
523            hash: base_record.hash.clone(),
524            size: base_record.size,
525            physical_key: base_record.physical_key.clone(),
526            ..ManifestRow::default()
527        };
528        let hash = added_file.hash.clone();
529
530        let storage = MockStorage::default();
531        storage
532            .write_byte_stream(
533                PathBuf::from("one/two two/three three three/READ ME.md"),
534                ByteStream::from_static(b"This is the README."),
535            )
536            .await?;
537        storage
538            .write_byte_stream(
539                PathBuf::from(format!(".quilt/objects/{}", hex::encode(hash.digest()))),
540                ByteStream::from_static(b"This is the README."),
541            )
542            .await?;
543
544        let status = InstalledPackageStatus {
545            changes: BTreeMap::from([(
546                PathBuf::from("one/two two/three three three/READ ME.md"),
547                Change::Added(added_file.clone()),
548            )]),
549            ..InstalledPackageStatus::default()
550        };
551
552        let lineage = PackageLineage {
553            paths: BTreeMap::from([(
554                PathBuf::from("one/two two/three three three/READ ME.md"),
555                PathState::default(),
556            )]),
557            ..PackageLineage::default()
558        };
559        let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
560
561        let result = commit_package(
562            lineage,
563            &mut manifest,
564            &DomainPaths::new(PathBuf::from("/")),
565            &storage,
566            PathBuf::default(),
567            status,
568            ("foo", "bar").into(),
569            String::from("Initial"),
570            Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
571            None,
572        )
573        .await;
574
575        assert_eq!(
576            result.unwrap_err().to_string(),
577            "Commit error: Trying to add a file that is already in the manifest: \"one/two two/three three three/READ ME.md\""
578        );
579
580        Ok(())
581    }
582
583    #[test(tokio::test)]
584    async fn test_modifying_and_commit() -> Res {
585        let storage = MockStorage::default();
586        storage
587            .write_byte_stream(
588                PathBuf::from("/working-dir/one/two two/three three three/READ ME.md"),
589                ByteStream::from_static(fixtures::objects::less_than_8mb()),
590            )
591            .await?;
592
593        let manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
594        let base_record = manifest
595            .get_record(&PathBuf::from("less-then-8mb.txt"))
596            .unwrap();
597        let modified_file = ManifestRow {
598            logical_key: PathBuf::from("one/two two/three three three/READ ME.md"),
599            hash: base_record.hash.clone(),
600            size: base_record.size,
601            physical_key: base_record.physical_key.clone(),
602            ..ManifestRow::default()
603        };
604        let status = InstalledPackageStatus {
605            changes: BTreeMap::from([(
606                PathBuf::from("one/two two/three three three/READ ME.md"),
607                Change::Modified(modified_file),
608            )]),
609            ..InstalledPackageStatus::default()
610        };
611
612        let lineage = PackageLineage {
613            paths: BTreeMap::from([(
614                PathBuf::from("one/two two/three three three/READ ME.md"),
615                PathState::default(),
616            )]),
617            ..PackageLineage::default()
618        };
619        let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
620
621        assert!(
622            lineage.commit.is_none(),
623            "Initial lineage has commit already"
624        );
625        assert!(
626            lineage
627                .paths
628                .contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
629            "Initial lineage doesn't have path, but should because we test installed and modified file"
630        );
631
632        let (lineage, commit) = commit_package(
633            lineage,
634            &mut manifest,
635            &DomainPaths::new(PathBuf::from("/")),
636            &storage,
637            PathBuf::from("/working-dir"),
638            status,
639            ("foo", "bar").into(),
640            String::from("Initial"),
641            Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
642            None,
643        )
644        .await?;
645
646        assert!(
647            lineage
648                .paths
649                .contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
650            "Commited lineage doesn't have path, but should have. We added new file and it should be there."
651        );
652        assert!(
653            storage
654                .exists(&PathBuf::from(format!(
655                    "/.quilt/objects/{}",
656                    fixtures::objects::LESS_THAN_8MB_HASH_HEX
657                )))
658                .await,
659            "Registry doesn't have installed path"
660        );
661        assert_eq!(
662            commit.hash,
663            "39bbc9a95f787cd938fb5830abe5e25408f0aac4000528b8717130be5f7bc2b3"
664        );
665
666        Ok(())
667    }
668}