Skip to main content

harn_vm/flow/
backend.rs

1//! Harn Flow VCS backend abstraction.
2//!
3//! The Phase 0 backend is [`ShadowGitBackend`]: it stores every emitted atom as
4//! a git commit on a sidecar ref (`refs/flow/atoms/<atom-id>`) without touching
5//! the user worktree. Later phases can replace the storage substrate by
6//! implementing [`VcsBackend`] directly.
7
8use std::collections::HashSet;
9use std::fmt;
10use std::path::{Path, PathBuf};
11use std::process::{Command, Stdio};
12
13use serde::{Deserialize, Serialize};
14use sha2::{Digest, Sha256};
15
16use super::slice::SliceId;
17use super::{Atom, AtomError, AtomId};
18
19const ATOM_REF_PREFIX: &str = "refs/flow/atoms";
20const SLICE_REF_PREFIX: &str = "refs/flow/slices";
21
22/// Errors produced by Flow VCS backends.
23#[derive(Debug)]
24pub enum VcsBackendError {
25    /// Backend configuration or caller input is invalid.
26    Invalid(String),
27    /// A requested atom, slice, commit, or ref is missing.
28    NotFound(String),
29    /// The backend intentionally does not implement this operation yet.
30    Unsupported(String),
31    /// Atom encoding, decoding, or validation failed.
32    Atom(AtomError),
33    /// JSON encoding or decoding failed.
34    Json(String),
35    /// A git command failed.
36    Git {
37        args: Vec<String>,
38        status: Option<i32>,
39        stderr: String,
40    },
41    /// The process could not be spawned or joined.
42    Io(String),
43}
44
45impl fmt::Display for VcsBackendError {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            VcsBackendError::Invalid(message) => write!(f, "vcs backend invalid: {message}"),
49            VcsBackendError::NotFound(message) => write!(f, "vcs backend not found: {message}"),
50            VcsBackendError::Unsupported(message) => {
51                write!(f, "vcs backend unsupported: {message}")
52            }
53            VcsBackendError::Atom(error) => write!(f, "{error}"),
54            VcsBackendError::Json(message) => write!(f, "vcs backend json error: {message}"),
55            VcsBackendError::Git {
56                args,
57                status,
58                stderr,
59            } => write!(
60                f,
61                "git {:?} failed with status {:?}: {}",
62                args,
63                status,
64                stderr.trim()
65            ),
66            VcsBackendError::Io(message) => write!(f, "vcs backend io error: {message}"),
67        }
68    }
69}
70
71impl std::error::Error for VcsBackendError {}
72
73impl From<AtomError> for VcsBackendError {
74    fn from(error: AtomError) -> Self {
75        Self::Atom(error)
76    }
77}
78
79impl From<serde_json::Error> for VcsBackendError {
80    fn from(error: serde_json::Error) -> Self {
81        Self::Json(error.to_string())
82    }
83}
84
85impl From<std::io::Error> for VcsBackendError {
86    fn from(error: std::io::Error) -> Self {
87        Self::Io(error.to_string())
88    }
89}
90
91/// A candidate shippable unit represented as an ordered atom closure.
92#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub struct FlowSlice {
94    pub id: SliceId,
95    pub atoms: Vec<AtomId>,
96}
97
98impl FlowSlice {
99    /// Build a deterministic slice from an ordered atom set.
100    pub fn new(atoms: Vec<AtomId>) -> Result<Self, VcsBackendError> {
101        if atoms.is_empty() {
102            return Err(VcsBackendError::Invalid(
103                "slice must contain at least one atom".to_string(),
104            ));
105        }
106        let mut hasher = Sha256::new();
107        hasher.update(b"FSLI");
108        for atom in &atoms {
109            hasher.update(atom.0);
110        }
111        Ok(Self {
112            id: SliceId(hasher.finalize().into()),
113            atoms,
114        })
115    }
116}
117
118/// Location of an atom in a VCS backend.
119#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
120pub struct AtomRef {
121    pub atom_id: AtomId,
122    pub commit: String,
123    pub ref_name: String,
124}
125
126/// Receipt returned after a slice is made visible for shipping.
127#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
128pub struct ShipReceipt {
129    pub slice_id: SliceId,
130    pub commit: String,
131    pub ref_name: String,
132}
133
134/// Receipt returned after exporting a Flow slice into a git ref.
135#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
136pub struct GitExportReceipt {
137    pub slice_id: SliceId,
138    pub commit: String,
139    pub ref_name: String,
140}
141
142/// Storage and shipping abstraction for Harn Flow atoms and slices.
143pub trait VcsBackend {
144    /// Persist one atom and return its backend location.
145    fn emit_atom(&self, atom: &Atom) -> Result<AtomRef, VcsBackendError>;
146    /// Derive a shippable slice from an ordered atom set.
147    fn derive_slice(&self, atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError>;
148    /// Publish a slice in the backend's native shipping surface.
149    fn ship_slice(&self, slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError>;
150    /// List persisted atoms.
151    fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError>;
152    /// Load atoms in the order recorded by a slice.
153    fn replay_slice(&self, slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError>;
154    /// Export a slice into a git ref.
155    fn export_git(
156        &self,
157        slice: &FlowSlice,
158        ref_name: &str,
159    ) -> Result<GitExportReceipt, VcsBackendError>;
160    /// Import a git ref containing ShadowGit atom commits as a Flow slice.
161    fn import_git(&self, ref_name: &str) -> Result<FlowSlice, VcsBackendError>;
162}
163
164/// Git-backed Phase 0 Flow backend.
165#[derive(Clone, Debug, PartialEq, Eq)]
166pub struct ShadowGitBackend {
167    repo_root: PathBuf,
168}
169
170impl ShadowGitBackend {
171    /// Create a backend rooted at an existing git worktree.
172    pub fn new(repo_root: impl Into<PathBuf>) -> Result<Self, VcsBackendError> {
173        let repo_root = repo_root.into();
174        let output = git_output_at(&repo_root, &["rev-parse", "--show-toplevel"], None)?;
175        let canonical = PathBuf::from(output.trim());
176        Ok(Self {
177            repo_root: canonical,
178        })
179    }
180
181    /// The canonical git worktree root used for all commands.
182    pub fn repo_root(&self) -> &Path {
183        &self.repo_root
184    }
185
186    fn atom_ref_name(atom_id: AtomId) -> String {
187        format!("{ATOM_REF_PREFIX}/{atom_id}")
188    }
189
190    fn slice_ref_name(slice_id: SliceId) -> String {
191        format!("{SLICE_REF_PREFIX}/{slice_id}")
192    }
193
194    fn atom_commit(&self, atom_id: AtomId) -> Result<String, VcsBackendError> {
195        let ref_name = Self::atom_ref_name(atom_id);
196        git_output_at(
197            &self.repo_root,
198            &["rev-parse", &format!("{ref_name}^{{commit}}")],
199            None,
200        )
201        .map(|commit| commit.trim().to_string())
202        .map_err(|error| match error {
203            VcsBackendError::Git { .. } => {
204                VcsBackendError::NotFound(format!("atom {atom_id} has no ShadowGit ref"))
205            }
206            other => other,
207        })
208    }
209
210    fn atom_from_commit(&self, commit: &str) -> Result<Atom, VcsBackendError> {
211        let payload = git_output_at(
212            &self.repo_root,
213            &["show", &format!("{commit}:atom.json")],
214            None,
215        )
216        .map_err(|error| match error {
217            VcsBackendError::Git { .. } => VcsBackendError::NotFound(format!(
218                "commit {commit} does not contain a ShadowGit atom payload"
219            )),
220            other => other,
221        })?;
222        let atom = Atom::from_json_slice(payload.as_bytes())?;
223        Ok(atom)
224    }
225
226    fn commit_for_slice(&self, slice: &FlowSlice) -> Result<String, VcsBackendError> {
227        let tail = slice
228            .atoms
229            .last()
230            .copied()
231            .ok_or_else(|| VcsBackendError::Invalid("slice must contain atoms".to_string()))?;
232        self.atom_commit(tail)
233    }
234
235    fn append_atom_closure(
236        &self,
237        atom_id: AtomId,
238        seen: &mut HashSet<AtomId>,
239        out: &mut Vec<AtomId>,
240    ) -> Result<(), VcsBackendError> {
241        if !seen.insert(atom_id) {
242            return Ok(());
243        }
244
245        let commit = self.atom_commit(atom_id)?;
246        let atom = self.atom_from_commit(&commit)?;
247        if atom.id != atom_id {
248            return Err(VcsBackendError::Invalid(format!(
249                "commit {commit} payload id {} did not match requested {atom_id}",
250                atom.id
251            )));
252        }
253        for parent in &atom.parents {
254            self.append_atom_closure(*parent, seen, out)?;
255        }
256        out.push(atom_id);
257        Ok(())
258    }
259
260    fn update_ref(&self, ref_name: &str, commit: &str) -> Result<(), VcsBackendError> {
261        validate_ref_name(&self.repo_root, ref_name)?;
262        git_output_at(&self.repo_root, &["update-ref", ref_name, commit], None)?;
263        Ok(())
264    }
265}
266
267impl VcsBackend for ShadowGitBackend {
268    fn emit_atom(&self, atom: &Atom) -> Result<AtomRef, VcsBackendError> {
269        atom.verify()?;
270
271        let ref_name = Self::atom_ref_name(atom.id);
272        if let Ok(commit) = self.atom_commit(atom.id) {
273            return Ok(AtomRef {
274                atom_id: atom.id,
275                commit,
276                ref_name,
277            });
278        }
279
280        let payload = atom.to_json()?;
281        let blob = git_output_at(
282            &self.repo_root,
283            &["hash-object", "-w", "--stdin"],
284            Some(payload.as_bytes()),
285        )?;
286        let tree_input = format!("100644 blob {}\tatom.json\n", blob.trim());
287        let tree = git_output_at(&self.repo_root, &["mktree"], Some(tree_input.as_bytes()))?;
288
289        let mut commit_args = vec!["commit-tree".to_string(), tree.trim().to_string()];
290        for parent in &atom.parents {
291            let parent_commit = self.atom_commit(*parent)?;
292            commit_args.push("-p".to_string());
293            commit_args.push(parent_commit);
294        }
295        commit_args.push("-m".to_string());
296        commit_args.push(format!("flow atom {}", atom.id));
297
298        let commit = git_output_at_owned(&self.repo_root, &commit_args, None)?;
299        let commit = commit.trim().to_string();
300        self.update_ref(&ref_name, &commit)?;
301        Ok(AtomRef {
302            atom_id: atom.id,
303            commit,
304            ref_name,
305        })
306    }
307
308    fn derive_slice(&self, atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError> {
309        let mut seen = HashSet::new();
310        let mut closure = Vec::new();
311        for atom in atoms {
312            self.append_atom_closure(*atom, &mut seen, &mut closure)?;
313        }
314        FlowSlice::new(closure)
315    }
316
317    fn ship_slice(&self, slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError> {
318        let commit = self.commit_for_slice(slice)?;
319        let ref_name = Self::slice_ref_name(slice.id);
320        self.update_ref(&ref_name, &commit)?;
321        Ok(ShipReceipt {
322            slice_id: slice.id,
323            commit,
324            ref_name,
325        })
326    }
327
328    fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError> {
329        let output = git_output_at(
330            &self.repo_root,
331            &[
332                "for-each-ref",
333                "--format=%(refname) %(objectname)",
334                ATOM_REF_PREFIX,
335            ],
336            None,
337        )?;
338        let mut atoms = Vec::new();
339        for line in output.lines().filter(|line| !line.trim().is_empty()) {
340            let (ref_name, commit) = line
341                .split_once(' ')
342                .ok_or_else(|| VcsBackendError::Invalid(format!("malformed ref line: {line}")))?;
343            let raw_id = ref_name
344                .strip_prefix(&format!("{ATOM_REF_PREFIX}/"))
345                .ok_or_else(|| {
346                    VcsBackendError::Invalid(format!("unexpected atom ref {ref_name}"))
347                })?;
348            atoms.push(AtomRef {
349                atom_id: AtomId::from_hex(raw_id)?,
350                commit: commit.to_string(),
351                ref_name: ref_name.to_string(),
352            });
353        }
354        atoms.sort_by_key(|atom| atom.atom_id.0);
355        Ok(atoms)
356    }
357
358    fn replay_slice(&self, slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError> {
359        slice
360            .atoms
361            .iter()
362            .map(|atom_id| {
363                let commit = self.atom_commit(*atom_id)?;
364                let atom = self.atom_from_commit(&commit)?;
365                if atom.id != *atom_id {
366                    return Err(VcsBackendError::Invalid(format!(
367                        "commit {commit} payload id {} did not match requested {atom_id}",
368                        atom.id
369                    )));
370                }
371                Ok(atom)
372            })
373            .collect()
374    }
375
376    fn export_git(
377        &self,
378        slice: &FlowSlice,
379        ref_name: &str,
380    ) -> Result<GitExportReceipt, VcsBackendError> {
381        let commit = self.commit_for_slice(slice)?;
382        self.update_ref(ref_name, &commit)?;
383        Ok(GitExportReceipt {
384            slice_id: slice.id,
385            commit,
386            ref_name: ref_name.to_string(),
387        })
388    }
389
390    fn import_git(&self, ref_name: &str) -> Result<FlowSlice, VcsBackendError> {
391        validate_ref_name(&self.repo_root, ref_name)?;
392        let output = git_output_at(&self.repo_root, &["rev-list", "--reverse", ref_name], None)?;
393        let mut atoms = Vec::new();
394        for commit in output.lines().filter(|line| !line.trim().is_empty()) {
395            let atom = self.atom_from_commit(commit)?;
396            atoms.push(atom.id);
397        }
398        FlowSlice::new(atoms)
399    }
400}
401
402/// Placeholder for the Phase 2 native Flow substrate.
403#[derive(Clone, Debug, Default, PartialEq, Eq)]
404pub struct FlowNativeBackend;
405
406impl FlowNativeBackend {
407    pub fn new() -> Self {
408        Self
409    }
410
411    fn unsupported<T>(&self) -> Result<T, VcsBackendError> {
412        Err(VcsBackendError::Unsupported(
413            "FlowNativeBackend is deferred to Flow Phase 2".to_string(),
414        ))
415    }
416}
417
418impl VcsBackend for FlowNativeBackend {
419    fn emit_atom(&self, _atom: &Atom) -> Result<AtomRef, VcsBackendError> {
420        self.unsupported()
421    }
422
423    fn derive_slice(&self, _atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError> {
424        self.unsupported()
425    }
426
427    fn ship_slice(&self, _slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError> {
428        self.unsupported()
429    }
430
431    fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError> {
432        self.unsupported()
433    }
434
435    fn replay_slice(&self, _slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError> {
436        self.unsupported()
437    }
438
439    fn export_git(
440        &self,
441        _slice: &FlowSlice,
442        _ref_name: &str,
443    ) -> Result<GitExportReceipt, VcsBackendError> {
444        self.unsupported()
445    }
446
447    fn import_git(&self, _ref_name: &str) -> Result<FlowSlice, VcsBackendError> {
448        self.unsupported()
449    }
450}
451
452fn validate_ref_name(repo_root: &Path, ref_name: &str) -> Result<(), VcsBackendError> {
453    if ref_name.trim().is_empty() {
454        return Err(VcsBackendError::Invalid(
455            "git ref name must not be empty".to_string(),
456        ));
457    }
458    git_output_at(repo_root, &["check-ref-format", ref_name], None)?;
459    Ok(())
460}
461
462fn git_output_at(
463    repo_root: &Path,
464    args: &[&str],
465    stdin: Option<&[u8]>,
466) -> Result<String, VcsBackendError> {
467    let owned: Vec<String> = args.iter().map(|arg| (*arg).to_string()).collect();
468    git_output_at_owned(repo_root, &owned, stdin)
469}
470
471fn git_output_at_owned(
472    repo_root: &Path,
473    args: &[String],
474    stdin: Option<&[u8]>,
475) -> Result<String, VcsBackendError> {
476    let mut command = Command::new("git");
477    command.args(args).current_dir(repo_root);
478    clear_git_env(&mut command);
479    command
480        .env("GIT_AUTHOR_NAME", "Harn Flow")
481        .env("GIT_AUTHOR_EMAIL", "flow@harn.local")
482        .env("GIT_COMMITTER_NAME", "Harn Flow")
483        .env("GIT_COMMITTER_EMAIL", "flow@harn.local");
484    if stdin.is_some() {
485        command.stdin(Stdio::piped());
486    }
487    command.stdout(Stdio::piped()).stderr(Stdio::piped());
488    let mut child = command.spawn()?;
489    if let Some(input) = stdin {
490        let mut child_stdin = child
491            .stdin
492            .take()
493            .ok_or_else(|| VcsBackendError::Io("failed to open git stdin".to_string()))?;
494        use std::io::Write;
495        child_stdin.write_all(input)?;
496    }
497    let output = child.wait_with_output()?;
498    if !output.status.success() {
499        return Err(VcsBackendError::Git {
500            args: args.to_vec(),
501            status: output.status.code(),
502            stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
503        });
504    }
505    Ok(String::from_utf8_lossy(&output.stdout).into_owned())
506}
507
508fn clear_git_env(command: &mut Command) {
509    command
510        .env_remove("GIT_DIR")
511        .env_remove("GIT_WORK_TREE")
512        .env_remove("GIT_COMMON_DIR")
513        .env_remove("GIT_INDEX_FILE")
514        .env_remove("GIT_PREFIX");
515}