Skip to main content

harn_vm/flow/
backend.rs

1//! Harn Flow VCS backend abstraction.
2//!
3//! The Phase 0 backend is [`ShadowGitBackend`]: it stores every emitted atom as
4//! a git commit on a sidecar ref (`refs/flow/atoms/<atom-id>`) without touching
5//! the user worktree. Later phases can replace the storage substrate by
6//! implementing [`VcsBackend`] directly.
7
8use std::collections::HashSet;
9use std::fmt;
10use std::path::{Path, PathBuf};
11use std::process::{Command, Stdio};
12
13use serde::{Deserialize, Serialize};
14use sha2::{Digest, Sha256};
15
16use super::slice::SliceId;
17use super::{Atom, AtomError, AtomId};
18
19const ATOM_REF_PREFIX: &str = "refs/flow/atoms";
20const SLICE_REF_PREFIX: &str = "refs/flow/slices";
21
22/// Errors produced by Flow VCS backends.
23#[derive(Debug)]
24pub enum VcsBackendError {
25    /// Backend configuration or caller input is invalid.
26    Invalid(String),
27    /// A requested atom, slice, commit, or ref is missing.
28    NotFound(String),
29    /// The backend intentionally does not implement this operation yet.
30    Unsupported(String),
31    /// Atom encoding, decoding, or validation failed.
32    Atom(AtomError),
33    /// JSON encoding or decoding failed.
34    Json(String),
35    /// SQLite storage failed.
36    Sqlite(String),
37    /// A git command failed.
38    Git {
39        args: Vec<String>,
40        status: Option<i32>,
41        stderr: String,
42    },
43    /// The process could not be spawned or joined.
44    Io(String),
45}
46
47impl fmt::Display for VcsBackendError {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            VcsBackendError::Invalid(message) => write!(f, "vcs backend invalid: {message}"),
51            VcsBackendError::NotFound(message) => write!(f, "vcs backend not found: {message}"),
52            VcsBackendError::Unsupported(message) => {
53                write!(f, "vcs backend unsupported: {message}")
54            }
55            VcsBackendError::Atom(error) => write!(f, "{error}"),
56            VcsBackendError::Json(message) => write!(f, "vcs backend json error: {message}"),
57            VcsBackendError::Sqlite(message) => write!(f, "vcs backend sqlite error: {message}"),
58            VcsBackendError::Git {
59                args,
60                status,
61                stderr,
62            } => write!(
63                f,
64                "git {:?} failed with status {:?}: {}",
65                args,
66                status,
67                stderr.trim()
68            ),
69            VcsBackendError::Io(message) => write!(f, "vcs backend io error: {message}"),
70        }
71    }
72}
73
74impl std::error::Error for VcsBackendError {}
75
76impl From<AtomError> for VcsBackendError {
77    fn from(error: AtomError) -> Self {
78        Self::Atom(error)
79    }
80}
81
82impl From<serde_json::Error> for VcsBackendError {
83    fn from(error: serde_json::Error) -> Self {
84        Self::Json(error.to_string())
85    }
86}
87
88impl From<rusqlite::Error> for VcsBackendError {
89    fn from(error: rusqlite::Error) -> Self {
90        Self::Sqlite(error.to_string())
91    }
92}
93
94impl From<std::io::Error> for VcsBackendError {
95    fn from(error: std::io::Error) -> Self {
96        Self::Io(error.to_string())
97    }
98}
99
100/// A candidate shippable unit represented as an ordered atom closure.
101#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
102pub struct FlowSlice {
103    pub id: SliceId,
104    pub atoms: Vec<AtomId>,
105}
106
107impl FlowSlice {
108    /// Build a deterministic slice from an ordered atom set.
109    pub fn new(atoms: Vec<AtomId>) -> Result<Self, VcsBackendError> {
110        if atoms.is_empty() {
111            return Err(VcsBackendError::Invalid(
112                "slice must contain at least one atom".to_string(),
113            ));
114        }
115        let mut hasher = Sha256::new();
116        hasher.update(b"FSLI");
117        for atom in &atoms {
118            hasher.update(atom.0);
119        }
120        Ok(Self {
121            id: SliceId(hasher.finalize().into()),
122            atoms,
123        })
124    }
125}
126
127/// Location of an atom in a VCS backend.
128#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
129pub struct AtomRef {
130    pub atom_id: AtomId,
131    pub commit: String,
132    pub ref_name: String,
133}
134
135/// Receipt returned after a slice is made visible for shipping.
136#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
137pub struct ShipReceipt {
138    pub slice_id: SliceId,
139    pub commit: String,
140    pub ref_name: String,
141}
142
143/// Receipt returned after exporting a Flow slice into a git ref.
144#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
145pub struct GitExportReceipt {
146    pub slice_id: SliceId,
147    pub commit: String,
148    pub ref_name: String,
149}
150
151/// Storage and shipping abstraction for Harn Flow atoms and slices.
152pub trait VcsBackend {
153    /// Persist one atom and return its backend location.
154    fn emit_atom(&self, atom: &Atom) -> Result<AtomRef, VcsBackendError>;
155    /// Derive a shippable slice from an ordered atom set.
156    fn derive_slice(&self, atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError>;
157    /// Publish a slice in the backend's native shipping surface.
158    fn ship_slice(&self, slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError>;
159    /// List persisted atoms.
160    fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError>;
161    /// Load atoms in the order recorded by a slice.
162    fn replay_slice(&self, slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError>;
163    /// Export a slice into a git ref.
164    fn export_git(
165        &self,
166        slice: &FlowSlice,
167        ref_name: &str,
168    ) -> Result<GitExportReceipt, VcsBackendError>;
169    /// Import a git ref containing ShadowGit atom commits as a Flow slice.
170    fn import_git(&self, ref_name: &str) -> Result<FlowSlice, VcsBackendError>;
171}
172
173/// Git-backed Phase 0 Flow backend.
174#[derive(Clone, Debug, PartialEq, Eq)]
175pub struct ShadowGitBackend {
176    repo_root: PathBuf,
177}
178
179impl ShadowGitBackend {
180    /// Create a backend rooted at an existing git worktree.
181    pub fn new(repo_root: impl Into<PathBuf>) -> Result<Self, VcsBackendError> {
182        let repo_root = repo_root.into();
183        let output = git_output_at(&repo_root, &["rev-parse", "--show-toplevel"], None)?;
184        let canonical = PathBuf::from(output.trim());
185        Ok(Self {
186            repo_root: canonical,
187        })
188    }
189
190    /// The canonical git worktree root used for all commands.
191    pub fn repo_root(&self) -> &Path {
192        &self.repo_root
193    }
194
195    fn atom_ref_name(atom_id: AtomId) -> String {
196        format!("{ATOM_REF_PREFIX}/{atom_id}")
197    }
198
199    fn slice_ref_name(slice_id: SliceId) -> String {
200        format!("{SLICE_REF_PREFIX}/{slice_id}")
201    }
202
203    fn atom_commit(&self, atom_id: AtomId) -> Result<String, VcsBackendError> {
204        let ref_name = Self::atom_ref_name(atom_id);
205        git_output_at(
206            &self.repo_root,
207            &["rev-parse", &format!("{ref_name}^{{commit}}")],
208            None,
209        )
210        .map(|commit| commit.trim().to_string())
211        .map_err(|error| match error {
212            VcsBackendError::Git { .. } => {
213                VcsBackendError::NotFound(format!("atom {atom_id} has no ShadowGit ref"))
214            }
215            other => other,
216        })
217    }
218
219    fn atom_from_commit(&self, commit: &str) -> Result<Atom, VcsBackendError> {
220        let payload = git_output_at(
221            &self.repo_root,
222            &["show", &format!("{commit}:atom.json")],
223            None,
224        )
225        .map_err(|error| match error {
226            VcsBackendError::Git { .. } => VcsBackendError::NotFound(format!(
227                "commit {commit} does not contain a ShadowGit atom payload"
228            )),
229            other => other,
230        })?;
231        let atom = Atom::from_json_slice(payload.as_bytes())?;
232        Ok(atom)
233    }
234
235    fn commit_for_slice(&self, slice: &FlowSlice) -> Result<String, VcsBackendError> {
236        let tail = slice
237            .atoms
238            .last()
239            .copied()
240            .ok_or_else(|| VcsBackendError::Invalid("slice must contain atoms".to_string()))?;
241        self.atom_commit(tail)
242    }
243
244    fn append_atom_closure(
245        &self,
246        atom_id: AtomId,
247        seen: &mut HashSet<AtomId>,
248        out: &mut Vec<AtomId>,
249    ) -> Result<(), VcsBackendError> {
250        if !seen.insert(atom_id) {
251            return Ok(());
252        }
253
254        let commit = self.atom_commit(atom_id)?;
255        let atom = self.atom_from_commit(&commit)?;
256        if atom.id != atom_id {
257            return Err(VcsBackendError::Invalid(format!(
258                "commit {commit} payload id {} did not match requested {atom_id}",
259                atom.id
260            )));
261        }
262        for parent in &atom.parents {
263            self.append_atom_closure(*parent, seen, out)?;
264        }
265        out.push(atom_id);
266        Ok(())
267    }
268
269    fn update_ref(&self, ref_name: &str, commit: &str) -> Result<(), VcsBackendError> {
270        validate_ref_name(&self.repo_root, ref_name)?;
271        git_output_at(&self.repo_root, &["update-ref", ref_name, commit], None)?;
272        Ok(())
273    }
274}
275
276impl VcsBackend for ShadowGitBackend {
277    fn emit_atom(&self, atom: &Atom) -> Result<AtomRef, VcsBackendError> {
278        atom.verify()?;
279
280        let ref_name = Self::atom_ref_name(atom.id);
281        if let Ok(commit) = self.atom_commit(atom.id) {
282            return Ok(AtomRef {
283                atom_id: atom.id,
284                commit,
285                ref_name,
286            });
287        }
288
289        let payload = atom.to_json()?;
290        let blob = git_output_at(
291            &self.repo_root,
292            &["hash-object", "-w", "--stdin"],
293            Some(payload.as_bytes()),
294        )?;
295        let tree_input = format!("100644 blob {}\tatom.json\n", blob.trim());
296        let tree = git_output_at(&self.repo_root, &["mktree"], Some(tree_input.as_bytes()))?;
297
298        let mut commit_args = vec!["commit-tree".to_string(), tree.trim().to_string()];
299        for parent in &atom.parents {
300            let parent_commit = self.atom_commit(*parent)?;
301            commit_args.push("-p".to_string());
302            commit_args.push(parent_commit);
303        }
304        commit_args.push("-m".to_string());
305        commit_args.push(format!("flow atom {}", atom.id));
306
307        let commit = git_output_at_owned(&self.repo_root, &commit_args, None)?;
308        let commit = commit.trim().to_string();
309        self.update_ref(&ref_name, &commit)?;
310        Ok(AtomRef {
311            atom_id: atom.id,
312            commit,
313            ref_name,
314        })
315    }
316
317    fn derive_slice(&self, atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError> {
318        let mut seen = HashSet::new();
319        let mut closure = Vec::new();
320        for atom in atoms {
321            self.append_atom_closure(*atom, &mut seen, &mut closure)?;
322        }
323        FlowSlice::new(closure)
324    }
325
326    fn ship_slice(&self, slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError> {
327        let commit = self.commit_for_slice(slice)?;
328        let ref_name = Self::slice_ref_name(slice.id);
329        self.update_ref(&ref_name, &commit)?;
330        Ok(ShipReceipt {
331            slice_id: slice.id,
332            commit,
333            ref_name,
334        })
335    }
336
337    fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError> {
338        let output = git_output_at(
339            &self.repo_root,
340            &[
341                "for-each-ref",
342                "--format=%(refname) %(objectname)",
343                ATOM_REF_PREFIX,
344            ],
345            None,
346        )?;
347        let mut atoms = Vec::new();
348        for line in output.lines().filter(|line| !line.trim().is_empty()) {
349            let (ref_name, commit) = line
350                .split_once(' ')
351                .ok_or_else(|| VcsBackendError::Invalid(format!("malformed ref line: {line}")))?;
352            let raw_id = ref_name
353                .strip_prefix(&format!("{ATOM_REF_PREFIX}/"))
354                .ok_or_else(|| {
355                    VcsBackendError::Invalid(format!("unexpected atom ref {ref_name}"))
356                })?;
357            atoms.push(AtomRef {
358                atom_id: AtomId::from_hex(raw_id)?,
359                commit: commit.to_string(),
360                ref_name: ref_name.to_string(),
361            });
362        }
363        atoms.sort_by_key(|atom| atom.atom_id.0);
364        Ok(atoms)
365    }
366
367    fn replay_slice(&self, slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError> {
368        slice
369            .atoms
370            .iter()
371            .map(|atom_id| {
372                let commit = self.atom_commit(*atom_id)?;
373                let atom = self.atom_from_commit(&commit)?;
374                if atom.id != *atom_id {
375                    return Err(VcsBackendError::Invalid(format!(
376                        "commit {commit} payload id {} did not match requested {atom_id}",
377                        atom.id
378                    )));
379                }
380                Ok(atom)
381            })
382            .collect()
383    }
384
385    fn export_git(
386        &self,
387        slice: &FlowSlice,
388        ref_name: &str,
389    ) -> Result<GitExportReceipt, VcsBackendError> {
390        let commit = self.commit_for_slice(slice)?;
391        self.update_ref(ref_name, &commit)?;
392        Ok(GitExportReceipt {
393            slice_id: slice.id,
394            commit,
395            ref_name: ref_name.to_string(),
396        })
397    }
398
399    fn import_git(&self, ref_name: &str) -> Result<FlowSlice, VcsBackendError> {
400        validate_ref_name(&self.repo_root, ref_name)?;
401        let output = git_output_at(&self.repo_root, &["rev-list", "--reverse", ref_name], None)?;
402        let mut atoms = Vec::new();
403        for commit in output.lines().filter(|line| !line.trim().is_empty()) {
404            let atom = self.atom_from_commit(commit)?;
405            atoms.push(atom.id);
406        }
407        FlowSlice::new(atoms)
408    }
409}
410
411/// Placeholder for the Phase 2 native Flow substrate.
412#[derive(Clone, Debug, Default, PartialEq, Eq)]
413pub struct FlowNativeBackend;
414
415impl FlowNativeBackend {
416    pub fn new() -> Self {
417        Self
418    }
419
420    fn unsupported<T>(&self) -> Result<T, VcsBackendError> {
421        Err(VcsBackendError::Unsupported(
422            "FlowNativeBackend is deferred to Flow Phase 2".to_string(),
423        ))
424    }
425}
426
427impl VcsBackend for FlowNativeBackend {
428    fn emit_atom(&self, _atom: &Atom) -> Result<AtomRef, VcsBackendError> {
429        self.unsupported()
430    }
431
432    fn derive_slice(&self, _atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError> {
433        self.unsupported()
434    }
435
436    fn ship_slice(&self, _slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError> {
437        self.unsupported()
438    }
439
440    fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError> {
441        self.unsupported()
442    }
443
444    fn replay_slice(&self, _slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError> {
445        self.unsupported()
446    }
447
448    fn export_git(
449        &self,
450        _slice: &FlowSlice,
451        _ref_name: &str,
452    ) -> Result<GitExportReceipt, VcsBackendError> {
453        self.unsupported()
454    }
455
456    fn import_git(&self, _ref_name: &str) -> Result<FlowSlice, VcsBackendError> {
457        self.unsupported()
458    }
459}
460
461fn validate_ref_name(repo_root: &Path, ref_name: &str) -> Result<(), VcsBackendError> {
462    if ref_name.trim().is_empty() {
463        return Err(VcsBackendError::Invalid(
464            "git ref name must not be empty".to_string(),
465        ));
466    }
467    git_output_at(repo_root, &["check-ref-format", ref_name], None)?;
468    Ok(())
469}
470
471fn git_output_at(
472    repo_root: &Path,
473    args: &[&str],
474    stdin: Option<&[u8]>,
475) -> Result<String, VcsBackendError> {
476    let owned: Vec<String> = args.iter().map(|arg| (*arg).to_string()).collect();
477    git_output_at_owned(repo_root, &owned, stdin)
478}
479
480fn git_output_at_owned(
481    repo_root: &Path,
482    args: &[String],
483    stdin: Option<&[u8]>,
484) -> Result<String, VcsBackendError> {
485    let mut command = Command::new("git");
486    command.args(args).current_dir(repo_root);
487    clear_git_env(&mut command);
488    command
489        .env("GIT_AUTHOR_NAME", "Harn Flow")
490        .env("GIT_AUTHOR_EMAIL", "flow@harn.local")
491        .env("GIT_COMMITTER_NAME", "Harn Flow")
492        .env("GIT_COMMITTER_EMAIL", "flow@harn.local");
493    if stdin.is_some() {
494        command.stdin(Stdio::piped());
495    }
496    command.stdout(Stdio::piped()).stderr(Stdio::piped());
497    let mut child = command.spawn()?;
498    if let Some(input) = stdin {
499        let mut child_stdin = child
500            .stdin
501            .take()
502            .ok_or_else(|| VcsBackendError::Io("failed to open git stdin".to_string()))?;
503        use std::io::Write;
504        child_stdin.write_all(input)?;
505    }
506    let output = child.wait_with_output()?;
507    if !output.status.success() {
508        return Err(VcsBackendError::Git {
509            args: args.to_vec(),
510            status: output.status.code(),
511            stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
512        });
513    }
514    Ok(String::from_utf8_lossy(&output.stdout).into_owned())
515}
516
517fn clear_git_env(command: &mut Command) {
518    command
519        .env_remove("GIT_DIR")
520        .env_remove("GIT_WORK_TREE")
521        .env_remove("GIT_COMMON_DIR")
522        .env_remove("GIT_INDEX_FILE")
523        .env_remove("GIT_PREFIX");
524}