use super::graph::has_cycle_impl;
use super::inner::InMemoryStorageInner;
use crate::domain::{Issue, IssueId};
use crate::error::{Error, Result};
use crate::storage::IssueStorage;
use rivets_jsonl::{read_jsonl_resilient, Warning as JsonlWarning};
use std::path::Path;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
pub enum LoadWarning {
MalformedJson { line_number: usize, error: String },
OrphanedDependency { from: IssueId, to: IssueId },
CircularDependency { from: IssueId, to: IssueId },
InvalidIssueData {
issue_id: IssueId,
line_number: usize,
error: String,
},
}
pub async fn load_from_jsonl(
path: &Path,
prefix: String,
) -> Result<(Box<dyn IssueStorage>, Vec<LoadWarning>)> {
let (parsed_issues, jsonl_warnings) =
read_jsonl_resilient::<Issue, _>(path)
.await
.map_err(|e| match e {
rivets_jsonl::Error::Io(io_err) => Error::Io(io_err),
rivets_jsonl::Error::Json(json_err) => Error::Json(json_err),
rivets_jsonl::Error::InvalidFormat(msg) => Error::Storage(msg),
})?;
let mut warnings = Vec::new();
for warning in jsonl_warnings {
match warning {
JsonlWarning::MalformedJson { line_number, error } => {
warnings.push(LoadWarning::MalformedJson { line_number, error });
}
JsonlWarning::SkippedLine {
line_number,
reason,
} => {
warnings.push(LoadWarning::MalformedJson {
line_number,
error: reason,
});
}
}
}
let mut issues = Vec::new();
for (index, issue) in parsed_issues.into_iter().enumerate() {
let record_number = index + 1; if let Err(validation_error) = issue.validate() {
warnings.push(LoadWarning::InvalidIssueData {
issue_id: issue.id.clone(),
line_number: record_number,
error: validation_error,
});
continue;
}
issues.push(issue);
}
let storage = Arc::new(Mutex::new(InMemoryStorageInner::new(prefix)));
let mut inner = storage.lock().await;
for issue in &issues {
let node = inner.graph.add_node(issue.id.clone());
inner.node_map.insert(issue.id.clone(), node);
inner.issues.insert(issue.id.clone(), issue.clone());
inner
.id_generator
.register_id(issue.id.as_str().to_string());
}
for issue in &issues {
for dep in &issue.dependencies {
if !inner.node_map.contains_key(&dep.depends_on_id) {
warnings.push(LoadWarning::OrphanedDependency {
from: issue.id.clone(),
to: dep.depends_on_id.clone(),
});
continue;
}
if has_cycle_impl(&inner.graph, &inner.node_map, &issue.id, &dep.depends_on_id)? {
warnings.push(LoadWarning::CircularDependency {
from: issue.id.clone(),
to: dep.depends_on_id.clone(),
});
continue;
}
let from_node = inner.node_map[&issue.id];
let to_node = inner.node_map[&dep.depends_on_id];
inner.graph.add_edge(from_node, to_node, dep.dep_type);
}
}
drop(inner);
Ok((Box::new(storage), warnings))
}
pub async fn save_to_jsonl(storage: &dyn IssueStorage, path: &Path) -> Result<()> {
let temp_path = path.with_extension("tmp");
let file = File::create(&temp_path).await.map_err(Error::Io)?;
let mut writer = BufWriter::new(file);
let mut issues = storage.export_all().await?;
for issue in &mut issues {
issue.dependencies.sort();
let json = serde_json::to_string(&issue)
.map_err(|e| Error::Storage(format!("JSON serialization failed: {}", e)))?;
writer.write_all(json.as_bytes()).await.map_err(Error::Io)?;
writer.write_all(b"\n").await.map_err(Error::Io)?;
}
writer.flush().await.map_err(Error::Io)?;
tokio::fs::rename(&temp_path, path)
.await
.map_err(Error::Io)?;
Ok(())
}