1use std::collections::HashSet;
9use std::fmt;
10use std::path::{Path, PathBuf};
11use std::process::{Command, Stdio};
12
13use serde::{Deserialize, Serialize};
14use sha2::{Digest, Sha256};
15
16use super::slice::SliceId;
17use super::{Atom, AtomError, AtomId};
18
19const ATOM_REF_PREFIX: &str = "refs/flow/atoms";
20const SLICE_REF_PREFIX: &str = "refs/flow/slices";
21
22#[derive(Debug)]
24pub enum VcsBackendError {
25 Invalid(String),
27 NotFound(String),
29 Unsupported(String),
31 Atom(AtomError),
33 Json(String),
35 Git {
37 args: Vec<String>,
38 status: Option<i32>,
39 stderr: String,
40 },
41 Io(String),
43}
44
45impl fmt::Display for VcsBackendError {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 VcsBackendError::Invalid(message) => write!(f, "vcs backend invalid: {message}"),
49 VcsBackendError::NotFound(message) => write!(f, "vcs backend not found: {message}"),
50 VcsBackendError::Unsupported(message) => {
51 write!(f, "vcs backend unsupported: {message}")
52 }
53 VcsBackendError::Atom(error) => write!(f, "{error}"),
54 VcsBackendError::Json(message) => write!(f, "vcs backend json error: {message}"),
55 VcsBackendError::Git {
56 args,
57 status,
58 stderr,
59 } => write!(
60 f,
61 "git {:?} failed with status {:?}: {}",
62 args,
63 status,
64 stderr.trim()
65 ),
66 VcsBackendError::Io(message) => write!(f, "vcs backend io error: {message}"),
67 }
68 }
69}
70
71impl std::error::Error for VcsBackendError {}
72
73impl From<AtomError> for VcsBackendError {
74 fn from(error: AtomError) -> Self {
75 Self::Atom(error)
76 }
77}
78
79impl From<serde_json::Error> for VcsBackendError {
80 fn from(error: serde_json::Error) -> Self {
81 Self::Json(error.to_string())
82 }
83}
84
85impl From<std::io::Error> for VcsBackendError {
86 fn from(error: std::io::Error) -> Self {
87 Self::Io(error.to_string())
88 }
89}
90
91#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub struct FlowSlice {
94 pub id: SliceId,
95 pub atoms: Vec<AtomId>,
96}
97
98impl FlowSlice {
99 pub fn new(atoms: Vec<AtomId>) -> Result<Self, VcsBackendError> {
101 if atoms.is_empty() {
102 return Err(VcsBackendError::Invalid(
103 "slice must contain at least one atom".to_string(),
104 ));
105 }
106 let mut hasher = Sha256::new();
107 hasher.update(b"FSLI");
108 for atom in &atoms {
109 hasher.update(atom.0);
110 }
111 Ok(Self {
112 id: SliceId(hasher.finalize().into()),
113 atoms,
114 })
115 }
116}
117
118#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
120pub struct AtomRef {
121 pub atom_id: AtomId,
122 pub commit: String,
123 pub ref_name: String,
124}
125
126#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
128pub struct ShipReceipt {
129 pub slice_id: SliceId,
130 pub commit: String,
131 pub ref_name: String,
132}
133
134#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
136pub struct GitExportReceipt {
137 pub slice_id: SliceId,
138 pub commit: String,
139 pub ref_name: String,
140}
141
142pub trait VcsBackend {
144 fn emit_atom(&self, atom: &Atom) -> Result<AtomRef, VcsBackendError>;
146 fn derive_slice(&self, atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError>;
148 fn ship_slice(&self, slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError>;
150 fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError>;
152 fn replay_slice(&self, slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError>;
154 fn export_git(
156 &self,
157 slice: &FlowSlice,
158 ref_name: &str,
159 ) -> Result<GitExportReceipt, VcsBackendError>;
160 fn import_git(&self, ref_name: &str) -> Result<FlowSlice, VcsBackendError>;
162}
163
164#[derive(Clone, Debug, PartialEq, Eq)]
166pub struct ShadowGitBackend {
167 repo_root: PathBuf,
168}
169
170impl ShadowGitBackend {
171 pub fn new(repo_root: impl Into<PathBuf>) -> Result<Self, VcsBackendError> {
173 let repo_root = repo_root.into();
174 let output = git_output_at(&repo_root, &["rev-parse", "--show-toplevel"], None)?;
175 let canonical = PathBuf::from(output.trim());
176 Ok(Self {
177 repo_root: canonical,
178 })
179 }
180
181 pub fn repo_root(&self) -> &Path {
183 &self.repo_root
184 }
185
186 fn atom_ref_name(atom_id: AtomId) -> String {
187 format!("{ATOM_REF_PREFIX}/{atom_id}")
188 }
189
190 fn slice_ref_name(slice_id: SliceId) -> String {
191 format!("{SLICE_REF_PREFIX}/{slice_id}")
192 }
193
194 fn atom_commit(&self, atom_id: AtomId) -> Result<String, VcsBackendError> {
195 let ref_name = Self::atom_ref_name(atom_id);
196 git_output_at(
197 &self.repo_root,
198 &["rev-parse", &format!("{ref_name}^{{commit}}")],
199 None,
200 )
201 .map(|commit| commit.trim().to_string())
202 .map_err(|error| match error {
203 VcsBackendError::Git { .. } => {
204 VcsBackendError::NotFound(format!("atom {atom_id} has no ShadowGit ref"))
205 }
206 other => other,
207 })
208 }
209
210 fn atom_from_commit(&self, commit: &str) -> Result<Atom, VcsBackendError> {
211 let payload = git_output_at(
212 &self.repo_root,
213 &["show", &format!("{commit}:atom.json")],
214 None,
215 )
216 .map_err(|error| match error {
217 VcsBackendError::Git { .. } => VcsBackendError::NotFound(format!(
218 "commit {commit} does not contain a ShadowGit atom payload"
219 )),
220 other => other,
221 })?;
222 let atom = Atom::from_json_slice(payload.as_bytes())?;
223 Ok(atom)
224 }
225
226 fn commit_for_slice(&self, slice: &FlowSlice) -> Result<String, VcsBackendError> {
227 let tail = slice
228 .atoms
229 .last()
230 .copied()
231 .ok_or_else(|| VcsBackendError::Invalid("slice must contain atoms".to_string()))?;
232 self.atom_commit(tail)
233 }
234
235 fn append_atom_closure(
236 &self,
237 atom_id: AtomId,
238 seen: &mut HashSet<AtomId>,
239 out: &mut Vec<AtomId>,
240 ) -> Result<(), VcsBackendError> {
241 if !seen.insert(atom_id) {
242 return Ok(());
243 }
244
245 let commit = self.atom_commit(atom_id)?;
246 let atom = self.atom_from_commit(&commit)?;
247 if atom.id != atom_id {
248 return Err(VcsBackendError::Invalid(format!(
249 "commit {commit} payload id {} did not match requested {atom_id}",
250 atom.id
251 )));
252 }
253 for parent in &atom.parents {
254 self.append_atom_closure(*parent, seen, out)?;
255 }
256 out.push(atom_id);
257 Ok(())
258 }
259
260 fn update_ref(&self, ref_name: &str, commit: &str) -> Result<(), VcsBackendError> {
261 validate_ref_name(&self.repo_root, ref_name)?;
262 git_output_at(&self.repo_root, &["update-ref", ref_name, commit], None)?;
263 Ok(())
264 }
265}
266
267impl VcsBackend for ShadowGitBackend {
268 fn emit_atom(&self, atom: &Atom) -> Result<AtomRef, VcsBackendError> {
269 atom.verify()?;
270
271 let ref_name = Self::atom_ref_name(atom.id);
272 if let Ok(commit) = self.atom_commit(atom.id) {
273 return Ok(AtomRef {
274 atom_id: atom.id,
275 commit,
276 ref_name,
277 });
278 }
279
280 let payload = atom.to_json()?;
281 let blob = git_output_at(
282 &self.repo_root,
283 &["hash-object", "-w", "--stdin"],
284 Some(payload.as_bytes()),
285 )?;
286 let tree_input = format!("100644 blob {}\tatom.json\n", blob.trim());
287 let tree = git_output_at(&self.repo_root, &["mktree"], Some(tree_input.as_bytes()))?;
288
289 let mut commit_args = vec!["commit-tree".to_string(), tree.trim().to_string()];
290 for parent in &atom.parents {
291 let parent_commit = self.atom_commit(*parent)?;
292 commit_args.push("-p".to_string());
293 commit_args.push(parent_commit);
294 }
295 commit_args.push("-m".to_string());
296 commit_args.push(format!("flow atom {}", atom.id));
297
298 let commit = git_output_at_owned(&self.repo_root, &commit_args, None)?;
299 let commit = commit.trim().to_string();
300 self.update_ref(&ref_name, &commit)?;
301 Ok(AtomRef {
302 atom_id: atom.id,
303 commit,
304 ref_name,
305 })
306 }
307
308 fn derive_slice(&self, atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError> {
309 let mut seen = HashSet::new();
310 let mut closure = Vec::new();
311 for atom in atoms {
312 self.append_atom_closure(*atom, &mut seen, &mut closure)?;
313 }
314 FlowSlice::new(closure)
315 }
316
317 fn ship_slice(&self, slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError> {
318 let commit = self.commit_for_slice(slice)?;
319 let ref_name = Self::slice_ref_name(slice.id);
320 self.update_ref(&ref_name, &commit)?;
321 Ok(ShipReceipt {
322 slice_id: slice.id,
323 commit,
324 ref_name,
325 })
326 }
327
328 fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError> {
329 let output = git_output_at(
330 &self.repo_root,
331 &[
332 "for-each-ref",
333 "--format=%(refname) %(objectname)",
334 ATOM_REF_PREFIX,
335 ],
336 None,
337 )?;
338 let mut atoms = Vec::new();
339 for line in output.lines().filter(|line| !line.trim().is_empty()) {
340 let (ref_name, commit) = line
341 .split_once(' ')
342 .ok_or_else(|| VcsBackendError::Invalid(format!("malformed ref line: {line}")))?;
343 let raw_id = ref_name
344 .strip_prefix(&format!("{ATOM_REF_PREFIX}/"))
345 .ok_or_else(|| {
346 VcsBackendError::Invalid(format!("unexpected atom ref {ref_name}"))
347 })?;
348 atoms.push(AtomRef {
349 atom_id: AtomId::from_hex(raw_id)?,
350 commit: commit.to_string(),
351 ref_name: ref_name.to_string(),
352 });
353 }
354 atoms.sort_by_key(|atom| atom.atom_id.0);
355 Ok(atoms)
356 }
357
358 fn replay_slice(&self, slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError> {
359 slice
360 .atoms
361 .iter()
362 .map(|atom_id| {
363 let commit = self.atom_commit(*atom_id)?;
364 let atom = self.atom_from_commit(&commit)?;
365 if atom.id != *atom_id {
366 return Err(VcsBackendError::Invalid(format!(
367 "commit {commit} payload id {} did not match requested {atom_id}",
368 atom.id
369 )));
370 }
371 Ok(atom)
372 })
373 .collect()
374 }
375
376 fn export_git(
377 &self,
378 slice: &FlowSlice,
379 ref_name: &str,
380 ) -> Result<GitExportReceipt, VcsBackendError> {
381 let commit = self.commit_for_slice(slice)?;
382 self.update_ref(ref_name, &commit)?;
383 Ok(GitExportReceipt {
384 slice_id: slice.id,
385 commit,
386 ref_name: ref_name.to_string(),
387 })
388 }
389
390 fn import_git(&self, ref_name: &str) -> Result<FlowSlice, VcsBackendError> {
391 validate_ref_name(&self.repo_root, ref_name)?;
392 let output = git_output_at(&self.repo_root, &["rev-list", "--reverse", ref_name], None)?;
393 let mut atoms = Vec::new();
394 for commit in output.lines().filter(|line| !line.trim().is_empty()) {
395 let atom = self.atom_from_commit(commit)?;
396 atoms.push(atom.id);
397 }
398 FlowSlice::new(atoms)
399 }
400}
401
402#[derive(Clone, Debug, Default, PartialEq, Eq)]
404pub struct FlowNativeBackend;
405
406impl FlowNativeBackend {
407 pub fn new() -> Self {
408 Self
409 }
410
411 fn unsupported<T>(&self) -> Result<T, VcsBackendError> {
412 Err(VcsBackendError::Unsupported(
413 "FlowNativeBackend is deferred to Flow Phase 2".to_string(),
414 ))
415 }
416}
417
418impl VcsBackend for FlowNativeBackend {
419 fn emit_atom(&self, _atom: &Atom) -> Result<AtomRef, VcsBackendError> {
420 self.unsupported()
421 }
422
423 fn derive_slice(&self, _atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError> {
424 self.unsupported()
425 }
426
427 fn ship_slice(&self, _slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError> {
428 self.unsupported()
429 }
430
431 fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError> {
432 self.unsupported()
433 }
434
435 fn replay_slice(&self, _slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError> {
436 self.unsupported()
437 }
438
439 fn export_git(
440 &self,
441 _slice: &FlowSlice,
442 _ref_name: &str,
443 ) -> Result<GitExportReceipt, VcsBackendError> {
444 self.unsupported()
445 }
446
447 fn import_git(&self, _ref_name: &str) -> Result<FlowSlice, VcsBackendError> {
448 self.unsupported()
449 }
450}
451
452fn validate_ref_name(repo_root: &Path, ref_name: &str) -> Result<(), VcsBackendError> {
453 if ref_name.trim().is_empty() {
454 return Err(VcsBackendError::Invalid(
455 "git ref name must not be empty".to_string(),
456 ));
457 }
458 git_output_at(repo_root, &["check-ref-format", ref_name], None)?;
459 Ok(())
460}
461
462fn git_output_at(
463 repo_root: &Path,
464 args: &[&str],
465 stdin: Option<&[u8]>,
466) -> Result<String, VcsBackendError> {
467 let owned: Vec<String> = args.iter().map(|arg| (*arg).to_string()).collect();
468 git_output_at_owned(repo_root, &owned, stdin)
469}
470
471fn git_output_at_owned(
472 repo_root: &Path,
473 args: &[String],
474 stdin: Option<&[u8]>,
475) -> Result<String, VcsBackendError> {
476 let mut command = Command::new("git");
477 command.args(args).current_dir(repo_root);
478 clear_git_env(&mut command);
479 command
480 .env("GIT_AUTHOR_NAME", "Harn Flow")
481 .env("GIT_AUTHOR_EMAIL", "flow@harn.local")
482 .env("GIT_COMMITTER_NAME", "Harn Flow")
483 .env("GIT_COMMITTER_EMAIL", "flow@harn.local");
484 if stdin.is_some() {
485 command.stdin(Stdio::piped());
486 }
487 command.stdout(Stdio::piped()).stderr(Stdio::piped());
488 let mut child = command.spawn()?;
489 if let Some(input) = stdin {
490 let mut child_stdin = child
491 .stdin
492 .take()
493 .ok_or_else(|| VcsBackendError::Io("failed to open git stdin".to_string()))?;
494 use std::io::Write;
495 child_stdin.write_all(input)?;
496 }
497 let output = child.wait_with_output()?;
498 if !output.status.success() {
499 return Err(VcsBackendError::Git {
500 args: args.to_vec(),
501 status: output.status.code(),
502 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
503 });
504 }
505 Ok(String::from_utf8_lossy(&output.stdout).into_owned())
506}
507
508fn clear_git_env(command: &mut Command) {
509 command
510 .env_remove("GIT_DIR")
511 .env_remove("GIT_WORK_TREE")
512 .env_remove("GIT_COMMON_DIR")
513 .env_remove("GIT_INDEX_FILE")
514 .env_remove("GIT_PREFIX");
515}