#![warn(missing_docs)]
use anyhow::{Context, Result};
use shiplog_bundle::{DIR_PROFILES, FILE_PACKET_MD, RunArtifactPaths, zip_path_for_profile};
use shiplog_bundle::{write_bundle_manifest, write_zip};
pub use shiplog_merge::ConflictResolution;
use shiplog_ports::{IngestOutput, Redactor, Renderer, WorkstreamClusterer};
use shiplog_render_json::{write_coverage_manifest, write_events_jsonl};
use shiplog_schema::bundle::BundleProfile;
use shiplog_schema::coverage::CoverageManifest;
use shiplog_schema::event::EventEnvelope;
use shiplog_schema::workstream::WorkstreamsFile;
use shiplog_workstreams::WorkstreamManager;
use std::path::{Path, PathBuf};
pub struct Engine<'a> {
pub renderer: &'a dyn Renderer,
pub clusterer: &'a dyn WorkstreamClusterer,
pub redactor: &'a dyn Redactor,
}
#[derive(Debug, Clone, PartialEq)]
pub struct RunOutputs {
pub out_dir: PathBuf,
pub packet_md: PathBuf,
pub workstreams_yaml: PathBuf,
pub ledger_events_jsonl: PathBuf,
pub coverage_manifest_json: PathBuf,
pub bundle_manifest_json: PathBuf,
pub zip_path: Option<PathBuf>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum WorkstreamSource {
Curated,
Suggested,
Generated,
}
impl std::fmt::Display for WorkstreamSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Curated => f.write_str("Curated"),
Self::Suggested => f.write_str("Suggested"),
Self::Generated => f.write_str("Generated"),
}
}
}
impl<'a> Engine<'a> {
pub fn new(
renderer: &'a dyn Renderer,
clusterer: &'a dyn WorkstreamClusterer,
redactor: &'a dyn Redactor,
) -> Self {
Self {
renderer,
clusterer,
redactor,
}
}
pub fn run(
&self,
ingest: IngestOutput,
user: &str,
window_label: &str,
out_dir: &Path,
zip: bool,
bundle_profile: &BundleProfile,
) -> Result<(RunOutputs, WorkstreamSource)> {
std::fs::create_dir_all(out_dir).with_context(|| format!("create {out_dir:?}"))?;
let events = ingest.events;
let coverage = ingest.coverage;
let paths = RunArtifactPaths::new(out_dir);
let (workstreams, ws_source) = self
.load_workstreams(out_dir, &events)
.context("load workstreams")?;
let ledger_path = paths.ledger_events();
let coverage_path = paths.coverage_manifest();
let packet_path = paths.packet_md();
write_events_jsonl(&ledger_path, &events)
.with_context(|| format!("write event ledger to {ledger_path:?}"))?;
write_coverage_manifest(&coverage_path, &coverage)
.with_context(|| format!("write coverage manifest to {coverage_path:?}"))?;
let ws_path = match ws_source {
WorkstreamSource::Curated => WorkstreamManager::curated_path(out_dir),
WorkstreamSource::Suggested => WorkstreamManager::suggested_path(out_dir),
WorkstreamSource::Generated => WorkstreamManager::suggested_path(out_dir),
};
let packet = self
.renderer
.render_packet_markdown(user, window_label, &events, &workstreams, &coverage)
.context("render packet markdown")?;
std::fs::write(&packet_path, &packet)
.with_context(|| format!("write packet to {packet_path:?}"))?;
self.render_profile(
"manager",
user,
window_label,
out_dir,
&events,
&workstreams,
&coverage,
)
.context("render manager profile")?;
self.render_profile(
"public",
user,
window_label,
out_dir,
&events,
&workstreams,
&coverage,
)
.context("render public profile")?;
let run_id = &coverage.run_id;
let _bundle = write_bundle_manifest(out_dir, run_id, bundle_profile)
.context("write bundle manifest")?;
let zip_path = if zip {
let z = zip_path_for_profile(out_dir, bundle_profile.as_str());
write_zip(out_dir, &z, bundle_profile).context("write zip archive")?;
Some(z)
} else {
None
};
Ok((
RunOutputs {
out_dir: out_dir.to_path_buf(),
packet_md: packet_path,
workstreams_yaml: ws_path,
ledger_events_jsonl: ledger_path,
coverage_manifest_json: coverage_path,
bundle_manifest_json: paths.bundle_manifest(),
zip_path,
},
ws_source,
))
}
fn load_workstreams(
&self,
out_dir: &Path,
events: &[EventEnvelope],
) -> Result<(WorkstreamsFile, WorkstreamSource)> {
let curated_exists = WorkstreamManager::has_curated(out_dir);
let suggested_exists = WorkstreamManager::suggested_path(out_dir).exists();
let ws = WorkstreamManager::load_effective(out_dir, self.clusterer, events)
.context("load effective workstreams")?;
let source = if curated_exists {
WorkstreamSource::Curated
} else if suggested_exists {
WorkstreamSource::Suggested
} else {
WorkstreamSource::Generated
};
Ok((ws, source))
}
#[allow(clippy::too_many_arguments)]
pub fn import(
&self,
ingest: IngestOutput,
user: &str,
window_label: &str,
out_dir: &Path,
zip: bool,
workstreams: Option<WorkstreamsFile>,
bundle_profile: &BundleProfile,
) -> Result<(RunOutputs, WorkstreamSource)> {
std::fs::create_dir_all(out_dir).with_context(|| format!("create {out_dir:?}"))?;
let events = ingest.events;
let coverage = ingest.coverage;
let paths = RunArtifactPaths::new(out_dir);
let (ws, ws_source) = if let Some(ws) = workstreams {
let curated_path = WorkstreamManager::curated_path(out_dir);
shiplog_workstreams::write_workstreams(&curated_path, &ws)
.with_context(|| format!("write curated workstreams to {curated_path:?}"))?;
(ws, WorkstreamSource::Curated)
} else {
self.load_workstreams(out_dir, &events)
.context("load workstreams")?
};
let ledger_path = paths.ledger_events();
let coverage_path = paths.coverage_manifest();
let packet_path = paths.packet_md();
write_events_jsonl(&ledger_path, &events)
.with_context(|| format!("write event ledger to {ledger_path:?}"))?;
write_coverage_manifest(&coverage_path, &coverage)
.with_context(|| format!("write coverage manifest to {coverage_path:?}"))?;
let ws_path = match ws_source {
WorkstreamSource::Curated => WorkstreamManager::curated_path(out_dir),
WorkstreamSource::Suggested => WorkstreamManager::suggested_path(out_dir),
WorkstreamSource::Generated => WorkstreamManager::suggested_path(out_dir),
};
let packet = self
.renderer
.render_packet_markdown(user, window_label, &events, &ws, &coverage)
.context("render packet markdown")?;
std::fs::write(&packet_path, &packet)
.with_context(|| format!("write packet to {packet_path:?}"))?;
self.render_profile(
"manager",
user,
window_label,
out_dir,
&events,
&ws,
&coverage,
)
.context("render manager profile")?;
self.render_profile(
"public",
user,
window_label,
out_dir,
&events,
&ws,
&coverage,
)
.context("render public profile")?;
let run_id = &coverage.run_id;
let _bundle = write_bundle_manifest(out_dir, run_id, bundle_profile)
.context("write bundle manifest")?;
let zip_path = if zip {
let z = zip_path_for_profile(out_dir, bundle_profile.as_str());
write_zip(out_dir, &z, bundle_profile).context("write zip archive")?;
Some(z)
} else {
None
};
Ok((
RunOutputs {
out_dir: out_dir.to_path_buf(),
packet_md: packet_path,
workstreams_yaml: ws_path,
ledger_events_jsonl: ledger_path,
coverage_manifest_json: coverage_path,
bundle_manifest_json: paths.bundle_manifest(),
zip_path,
},
ws_source,
))
}
pub fn refresh(
&self,
ingest: IngestOutput,
user: &str,
window_label: &str,
out_dir: &Path,
zip: bool,
bundle_profile: &BundleProfile,
) -> Result<RunOutputs> {
std::fs::create_dir_all(out_dir).with_context(|| format!("create {out_dir:?}"))?;
let events = ingest.events;
let coverage = ingest.coverage;
let paths = RunArtifactPaths::new(out_dir);
let workstreams = if WorkstreamManager::has_curated(out_dir) {
let path = WorkstreamManager::curated_path(out_dir);
let text = std::fs::read_to_string(&path)
.with_context(|| format!("read curated workstreams from {path:?}"))?;
serde_yaml::from_str(&text)
.with_context(|| format!("parse curated workstreams yaml {path:?}"))?
} else {
let suggested_path = WorkstreamManager::suggested_path(out_dir);
if suggested_path.exists() {
let text = std::fs::read_to_string(&suggested_path).with_context(|| {
format!("read suggested workstreams from {suggested_path:?}")
})?;
serde_yaml::from_str(&text).with_context(|| {
format!("parse suggested workstreams yaml {suggested_path:?}")
})?
} else {
anyhow::bail!(
"No workstreams found. Run `shiplog collect` first to generate workstreams."
);
}
};
let ledger_path = paths.ledger_events();
let coverage_path = paths.coverage_manifest();
let packet_path = paths.packet_md();
write_events_jsonl(&ledger_path, &events)
.with_context(|| format!("write event ledger to {ledger_path:?}"))?;
write_coverage_manifest(&coverage_path, &coverage)
.with_context(|| format!("write coverage manifest to {coverage_path:?}"))?;
let ws_path = if WorkstreamManager::has_curated(out_dir) {
WorkstreamManager::curated_path(out_dir)
} else {
WorkstreamManager::suggested_path(out_dir)
};
let packet = self
.renderer
.render_packet_markdown(user, window_label, &events, &workstreams, &coverage)
.context("render packet markdown")?;
std::fs::write(&packet_path, &packet)
.with_context(|| format!("write packet to {packet_path:?}"))?;
self.render_profile(
"manager",
user,
window_label,
out_dir,
&events,
&workstreams,
&coverage,
)
.context("render manager profile")?;
self.render_profile(
"public",
user,
window_label,
out_dir,
&events,
&workstreams,
&coverage,
)
.context("render public profile")?;
let run_id = &coverage.run_id;
let _bundle = write_bundle_manifest(out_dir, run_id, bundle_profile)
.context("write bundle manifest")?;
let zip_path = if zip {
let z = zip_path_for_profile(out_dir, bundle_profile.as_str());
write_zip(out_dir, &z, bundle_profile).context("write zip archive")?;
Some(z)
} else {
None
};
Ok(RunOutputs {
out_dir: out_dir.to_path_buf(),
packet_md: packet_path,
workstreams_yaml: ws_path,
ledger_events_jsonl: ledger_path,
coverage_manifest_json: coverage_path,
bundle_manifest_json: paths.bundle_manifest(),
zip_path,
})
}
#[allow(clippy::too_many_arguments)]
fn render_profile(
&self,
profile: &str,
user: &str,
window_label: &str,
out_dir: &Path,
events: &[EventEnvelope],
workstreams: &WorkstreamsFile,
coverage: &CoverageManifest,
) -> Result<()> {
let prof_dir = out_dir.join(DIR_PROFILES).join(profile);
std::fs::create_dir_all(&prof_dir)
.with_context(|| format!("create profile directory {prof_dir:?}"))?;
let red_events = self
.redactor
.redact_events(events, profile)
.with_context(|| format!("redact events for {profile} profile"))?;
let red_ws = self
.redactor
.redact_workstreams(workstreams, profile)
.with_context(|| format!("redact workstreams for {profile} profile"))?;
let md = self
.renderer
.render_packet_markdown(user, window_label, &red_events, &red_ws, coverage)
.with_context(|| format!("render {profile} packet markdown"))?;
std::fs::write(prof_dir.join(FILE_PACKET_MD), &md)
.with_context(|| format!("write {profile} packet to {prof_dir:?}"))?;
Ok(())
}
pub fn merge(
&self,
ingest_outputs: Vec<IngestOutput>,
resolution: ConflictResolution,
) -> Result<IngestOutput> {
#[cfg(feature = "merge-pipeline")]
{
let merged = shiplog_merge::merge_ingest_outputs(&ingest_outputs, resolution)
.context("merge ingest outputs")?;
Ok(merged.ingest_output)
}
#[cfg(not(feature = "merge-pipeline"))]
{
shiplog_merge::merge_ingest_outputs_legacy(&ingest_outputs, resolution)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{NaiveDate, TimeZone, Utc};
use shiplog_bundle::{PROFILE_MANAGER, PROFILE_PUBLIC};
use shiplog_ids::{EventId, RunId};
use shiplog_ports::IngestOutput;
use shiplog_schema::coverage::{Completeness, CoverageManifest, TimeWindow};
use shiplog_schema::event::*;
use shiplog_workstreams::RepoClusterer;
fn pr_event(repo: &str, number: u64, title: &str) -> EventEnvelope {
EventEnvelope {
id: EventId::from_parts(["github", "pr", repo, &number.to_string()]),
kind: EventKind::PullRequest,
occurred_at: Utc.timestamp_opt(0, 0).unwrap(),
actor: Actor {
login: "user".into(),
id: None,
},
repo: RepoRef {
full_name: repo.to_string(),
html_url: Some(format!("https://github.com/{repo}")),
visibility: RepoVisibility::Unknown,
},
payload: EventPayload::PullRequest(PullRequestEvent {
number,
title: title.to_string(),
state: PullRequestState::Merged,
created_at: Utc.timestamp_opt(0, 0).unwrap(),
merged_at: Some(Utc.timestamp_opt(0, 0).unwrap()),
additions: Some(1),
deletions: Some(0),
changed_files: Some(1),
touched_paths_hint: vec![],
window: Some(TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
}),
}),
tags: vec![],
links: vec![Link {
label: "pr".into(),
url: format!("https://github.com/{repo}/pull/{number}"),
}],
source: SourceRef {
system: SourceSystem::Github,
url: Some("https://api.github.com/...".into()),
opaque_id: None,
},
}
}
fn test_ingest() -> IngestOutput {
let events = vec![
pr_event("acme/foo", 1, "Add feature"),
pr_event("acme/foo", 2, "Fix bug"),
];
let coverage = CoverageManifest {
run_id: RunId("test_run_1".into()),
generated_at: Utc.timestamp_opt(0, 0).unwrap(),
user: "tester".into(),
window: TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
},
mode: "merged".into(),
sources: vec!["github".into()],
slices: vec![],
warnings: vec![],
completeness: Completeness::Complete,
};
IngestOutput { events, coverage }
}
fn test_engine() -> Engine<'static> {
let renderer: &'static dyn shiplog_ports::Renderer =
Box::leak(Box::new(shiplog_render_md::MarkdownRenderer::default()));
let clusterer: &'static dyn shiplog_ports::WorkstreamClusterer =
Box::leak(Box::new(RepoClusterer));
let redactor: &'static dyn shiplog_ports::Redactor = Box::leak(Box::new(
shiplog_redact::DeterministicRedactor::new(b"test-key"),
));
Engine::new(renderer, clusterer, redactor)
}
#[test]
fn run_creates_expected_output_files() {
let dir = tempfile::tempdir().unwrap();
let out_dir = dir.path().join("test_run_1");
let engine = test_engine();
let ingest = test_ingest();
let (outputs, _) = engine
.run(
ingest,
"tester",
"2025-01-01..2025-02-01",
&out_dir,
false,
&BundleProfile::Internal,
)
.unwrap();
assert!(outputs.packet_md.exists(), "packet.md missing");
assert!(
outputs.ledger_events_jsonl.exists(),
"ledger.events.jsonl missing"
);
assert!(
outputs.coverage_manifest_json.exists(),
"coverage.manifest.json missing"
);
assert!(
outputs.bundle_manifest_json.exists(),
"bundle.manifest.json missing"
);
assert!(
out_dir
.join(DIR_PROFILES)
.join(PROFILE_MANAGER)
.join(FILE_PACKET_MD)
.exists(),
"manager profile missing"
);
assert!(
out_dir
.join(DIR_PROFILES)
.join(PROFILE_PUBLIC)
.join(FILE_PACKET_MD)
.exists(),
"public profile missing"
);
}
#[test]
fn run_with_zip_creates_archive() {
let dir = tempfile::tempdir().unwrap();
let out_dir = dir.path().join("test_run_zip");
let engine = test_engine();
let ingest = test_ingest();
let (outputs, _) = engine
.run(
ingest,
"tester",
"2025-01-01..2025-02-01",
&out_dir,
true,
&BundleProfile::Internal,
)
.unwrap();
assert!(
outputs.zip_path.is_some(),
"zip_path should be Some when zip=true"
);
assert!(
outputs.zip_path.as_ref().unwrap().exists(),
"zip file missing"
);
}
#[test]
fn zip_path_internal_uses_plain_extension() {
let p = zip_path_for_profile(Path::new("/tmp/run_123"), "internal");
assert_eq!(p, Path::new("/tmp/run_123.zip"));
}
#[test]
fn zip_path_manager_includes_profile_name() {
let p = zip_path_for_profile(Path::new("/tmp/run_123"), "manager");
assert_eq!(p, Path::new("/tmp/run_123.manager.zip"));
}
}