use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use petgraph::Direction;
use petgraph::algo::is_cyclic_directed;
use petgraph::graph::{DiGraph, NodeIndex};
use petgraph::visit::EdgeRef;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
pub struct ThreeStateModel {
pub application_state: Arc<ApplicationState>,
pub operation_state: Arc<OperationState>,
pub dependency_state: Arc<DependencyState>,
}
impl ThreeStateModel {
pub fn new() -> Self {
Self {
application_state: Arc::new(ApplicationState::new()),
operation_state: Arc::new(OperationState::new()),
dependency_state: Arc::new(DependencyState::new()),
}
}
pub async fn validate_operation(&self, op: &StateModelProposedOperation) -> ValidationResult {
let mut errors = Vec::new();
let mut warnings = Vec::new();
for resource in &op.resources_needed {
if !self.application_state.resource_exists(resource).await {
warnings.push(format!("Resource '{}' does not exist yet", resource));
}
}
let active_ops = self.operation_state.get_active_operations().await;
for active_op in active_ops {
let active_resources: HashSet<_> = active_op
.resources_needed
.iter()
.chain(active_op.resources_produced.iter())
.collect();
let proposed_resources: HashSet<_> = op
.resources_needed
.iter()
.chain(op.resources_produced.iter())
.collect();
let overlap: Vec<_> = active_resources.intersection(&proposed_resources).collect();
if !overlap.is_empty() {
errors.push(format!(
"Conflict with running operation '{}': shared resources {:?}",
active_op.id,
overlap.iter().map(|s| s.as_str()).collect::<Vec<_>>()
));
}
}
if self
.dependency_state
.would_deadlock(&op.agent_id, &op.resources_needed)
.await
{
errors.push("Operation would create a deadlock".to_string());
}
ValidationResult {
valid: errors.is_empty(),
errors,
warnings,
}
}
pub async fn record_state_change(&self, change: StateChange) {
for app_change in &change.application_changes {
match app_change {
ApplicationChange::FileModified { path, new_hash } => {
self.application_state
.update_file(path.clone(), new_hash.clone())
.await;
}
ApplicationChange::ArtifactInvalidated { artifact_id } => {
self.application_state
.invalidate_artifact(artifact_id)
.await;
}
ApplicationChange::GitStateChanged { new_state } => {
self.application_state
.update_git_state(new_state.clone())
.await;
}
ApplicationChange::ResourceCreated { resource_id } => {
self.application_state
.mark_resource_exists(resource_id)
.await;
}
ApplicationChange::ResourceDeleted { resource_id } => {
self.application_state
.mark_resource_deleted(resource_id)
.await;
}
}
}
for (from, to, edge) in &change.new_dependencies {
self.dependency_state
.add_dependency(from, to, edge.clone())
.await;
}
}
pub async fn snapshot(&self) -> StateSnapshot {
StateSnapshot {
files: self.application_state.get_all_files().await,
locks: self.dependency_state.get_current_holders().await,
git_state: self.application_state.get_git_state().await,
active_operations: self.operation_state.get_active_operation_ids().await,
}
}
}
impl Default for ThreeStateModel {
fn default() -> Self {
Self::new()
}
}
pub struct ApplicationState {
files: RwLock<HashMap<PathBuf, FileStatus>>,
build_artifacts: RwLock<HashMap<String, ArtifactStatus>>,
git_state: RwLock<GitState>,
resources: RwLock<HashSet<String>>,
}
impl ApplicationState {
pub fn new() -> Self {
Self {
files: RwLock::new(HashMap::new()),
build_artifacts: RwLock::new(HashMap::new()),
git_state: RwLock::new(GitState::default()),
resources: RwLock::new(HashSet::new()),
}
}
pub async fn resource_exists(&self, resource_id: &str) -> bool {
let path = PathBuf::from(resource_id);
if self.files.read().await.contains_key(&path) {
return true;
}
self.resources.read().await.contains(resource_id)
}
pub async fn mark_resource_exists(&self, resource_id: &str) {
self.resources.write().await.insert(resource_id.to_string());
}
pub async fn mark_resource_deleted(&self, resource_id: &str) {
self.resources.write().await.remove(resource_id);
}
pub async fn update_file(&self, path: PathBuf, content_hash: String) {
let mut files = self.files.write().await;
let status = files.entry(path).or_insert_with(|| FileStatus {
exists: true,
content_hash: String::new(),
last_modified: SystemTime::now(),
locked_by: None,
dirty: false,
});
status.content_hash = content_hash;
status.last_modified = SystemTime::now();
status.dirty = true;
status.exists = true;
}
pub async fn get_all_files(&self) -> HashMap<PathBuf, FileStatus> {
self.files.read().await.clone()
}
pub async fn invalidate_artifact(&self, artifact_id: &str) {
if let Some(artifact) = self.build_artifacts.write().await.get_mut(artifact_id) {
artifact.valid = false;
}
}
pub async fn update_git_state(&self, state: GitState) {
*self.git_state.write().await = state;
}
pub async fn get_git_state(&self) -> GitState {
self.git_state.read().await.clone()
}
pub async fn lock_file(&self, path: &PathBuf, agent_id: &str) {
if let Some(file) = self.files.write().await.get_mut(path) {
file.locked_by = Some(agent_id.to_string());
}
}
pub async fn unlock_file(&self, path: &PathBuf) {
if let Some(file) = self.files.write().await.get_mut(path) {
file.locked_by = None;
}
}
pub async fn mark_files_clean(&self) {
for file in self.files.write().await.values_mut() {
file.dirty = false;
}
}
pub async fn record_artifact(&self, artifact_id: String, source_hash: String) {
self.build_artifacts.write().await.insert(
artifact_id,
ArtifactStatus {
valid: true,
built_from_hash: source_hash,
build_time: Instant::now(),
},
);
}
}
impl Default for ApplicationState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileStatus {
pub exists: bool,
pub content_hash: String,
#[serde(skip, default = "default_system_time")]
pub last_modified: SystemTime,
pub locked_by: Option<String>,
pub dirty: bool,
}
fn default_system_time() -> SystemTime {
SystemTime::UNIX_EPOCH
}
#[derive(Debug, Clone)]
pub struct ArtifactStatus {
pub valid: bool,
pub built_from_hash: String,
pub build_time: Instant,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct GitState {
pub current_branch: String,
pub head_commit: String,
pub staged_files: Vec<String>,
pub modified_files: Vec<String>,
pub has_conflicts: bool,
}
pub struct OperationState {
operations: RwLock<HashMap<String, OperationLog>>,
agent_operations: RwLock<HashMap<String, Vec<String>>>,
active_operations: RwLock<HashSet<String>>,
next_id: RwLock<u64>,
}
impl OperationState {
pub fn new() -> Self {
Self {
operations: RwLock::new(HashMap::new()),
agent_operations: RwLock::new(HashMap::new()),
active_operations: RwLock::new(HashSet::new()),
next_id: RwLock::new(1),
}
}
pub async fn generate_id(&self) -> String {
let mut id = self.next_id.write().await;
let op_id = format!("op-{}", *id);
*id += 1;
op_id
}
pub async fn start_operation(&self, log: OperationLog) -> String {
let id = log.id.clone();
self.operations
.write()
.await
.insert(id.clone(), log.clone());
self.agent_operations
.write()
.await
.entry(log.agent_id.clone())
.or_default()
.push(id.clone());
self.active_operations.write().await.insert(id.clone());
id
}
pub async fn complete_operation(
&self,
operation_id: &str,
success: bool,
outputs: Option<serde_json::Value>,
error: Option<String>,
) {
self.active_operations.write().await.remove(operation_id);
if let Some(op) = self.operations.write().await.get_mut(operation_id) {
op.completed_at = Some(Instant::now());
op.status = if success {
OperationLogStatus::Completed
} else {
OperationLogStatus::Failed
};
op.outputs = outputs;
op.error = error;
}
}
pub async fn mark_compensated(&self, operation_id: &str) {
if let Some(op) = self.operations.write().await.get_mut(operation_id) {
op.status = OperationLogStatus::Compensated;
}
}
pub async fn get_active_operations(&self) -> Vec<OperationLog> {
let active_ids = self.active_operations.read().await.clone();
let operations = self.operations.read().await;
active_ids
.iter()
.filter_map(|id| operations.get(id).cloned())
.collect()
}
pub async fn get_active_operation_ids(&self) -> Vec<String> {
self.active_operations
.read()
.await
.iter()
.cloned()
.collect()
}
pub async fn get_operation(&self, operation_id: &str) -> Option<OperationLog> {
self.operations.read().await.get(operation_id).cloned()
}
pub async fn get_agent_operations(&self, agent_id: &str) -> Vec<OperationLog> {
let op_ids = self
.agent_operations
.read()
.await
.get(agent_id)
.cloned()
.unwrap_or_default();
let operations = self.operations.read().await;
op_ids
.iter()
.filter_map(|id| operations.get(id).cloned())
.collect()
}
pub async fn add_child_operation(&self, parent_id: &str, child_id: &str) {
let mut operations = self.operations.write().await;
if let Some(parent) = operations.get_mut(parent_id) {
parent.child_operations.push(child_id.to_string());
}
if let Some(child) = operations.get_mut(child_id) {
child.parent_operation = Some(parent_id.to_string());
}
}
}
impl Default for OperationState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct OperationLog {
pub id: String,
pub agent_id: String,
pub operation_type: String,
pub started_at: Instant,
pub completed_at: Option<Instant>,
pub status: OperationLogStatus,
pub inputs: serde_json::Value,
pub outputs: Option<serde_json::Value>,
pub error: Option<String>,
pub child_operations: Vec<String>,
pub parent_operation: Option<String>,
pub resources_needed: Vec<String>,
pub resources_produced: Vec<String>,
}
impl OperationLog {
pub fn new(
id: String,
agent_id: String,
operation_type: String,
inputs: serde_json::Value,
) -> Self {
Self {
id,
agent_id,
operation_type,
started_at: Instant::now(),
completed_at: None,
status: OperationLogStatus::Running,
inputs,
outputs: None,
error: None,
child_operations: Vec::new(),
parent_operation: None,
resources_needed: Vec::new(),
resources_produced: Vec::new(),
}
}
pub fn with_resources(mut self, needed: Vec<String>, produced: Vec<String>) -> Self {
self.resources_needed = needed;
self.resources_produced = produced;
self
}
pub fn duration(&self) -> Option<std::time::Duration> {
self.completed_at
.map(|end| end.duration_since(self.started_at))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum OperationLogStatus {
Pending,
Running,
Completed,
Failed,
Compensated,
}
pub struct DependencyState {
graph: RwLock<DiGraph<ResourceNode, DependencyEdge>>,
resource_index: RwLock<HashMap<String, NodeIndex>>,
}
impl DependencyState {
pub fn new() -> Self {
Self {
graph: RwLock::new(DiGraph::new()),
resource_index: RwLock::new(HashMap::new()),
}
}
async fn ensure_node(&self, resource_id: &str, resource_type: ResourceNodeType) -> NodeIndex {
let mut index = self.resource_index.write().await;
let mut graph = self.graph.write().await;
if let Some(&node_idx) = index.get(resource_id) {
return node_idx;
}
let node = ResourceNode {
resource_id: resource_id.to_string(),
resource_type,
current_holder: None,
};
let node_idx = graph.add_node(node);
index.insert(resource_id.to_string(), node_idx);
node_idx
}
pub async fn add_dependency(&self, from: &str, to: &str, edge: DependencyEdge) {
let from_idx = self.ensure_node(from, ResourceNodeType::Generic).await;
let to_idx = self.ensure_node(to, ResourceNodeType::Generic).await;
self.graph.write().await.add_edge(to_idx, from_idx, edge);
}
pub async fn remove_dependency(&self, from: &str, to: &str) {
let index = self.resource_index.read().await;
let mut graph = self.graph.write().await;
if let (Some(&from_idx), Some(&to_idx)) = (index.get(from), index.get(to))
&& let Some(edge) = graph.find_edge(to_idx, from_idx)
{
graph.remove_edge(edge);
}
}
pub async fn would_deadlock(&self, agent_id: &str, resources: &[String]) -> bool {
let mut graph = self.graph.write().await;
let mut index = self.resource_index.write().await;
let agent_node_id = format!("agent:{}", agent_id);
let agent_idx = if let Some(&idx) = index.get(&agent_node_id) {
idx
} else {
let node = ResourceNode {
resource_id: agent_node_id.clone(),
resource_type: ResourceNodeType::Agent(agent_id.to_string()),
current_holder: None,
};
let idx = graph.add_node(node);
index.insert(agent_node_id.clone(), idx);
idx
};
let mut temp_edges = Vec::new();
for resource in resources {
if let Some(&resource_idx) = index.get(resource) {
let edge = graph.add_edge(
agent_idx,
resource_idx,
DependencyEdge {
dependency_type: DependencyType::WaitsFor,
strength: DependencyStrength::Hard,
},
);
temp_edges.push(edge);
}
}
let has_cycle = is_cyclic_directed(&*graph);
for edge in temp_edges {
graph.remove_edge(edge);
}
has_cycle
}
pub async fn get_blocking_resources(&self, resource_id: &str) -> Vec<String> {
let graph = self.graph.read().await;
let index = self.resource_index.read().await;
let mut blocking = Vec::new();
if let Some(&node_idx) = index.get(resource_id) {
for edge_ref in graph.edges_directed(node_idx, Direction::Incoming) {
if let Some(source_node) = graph.node_weight(edge_ref.source())
&& source_node.current_holder.is_some()
{
blocking.push(source_node.resource_id.clone());
}
}
}
blocking
}
pub async fn set_holder(&self, resource_id: &str, agent_id: Option<&str>) {
let index = self.resource_index.read().await;
let mut graph = self.graph.write().await;
if let Some(&node_idx) = index.get(resource_id)
&& let Some(node) = graph.node_weight_mut(node_idx)
{
node.current_holder = agent_id.map(String::from);
}
}
pub async fn get_current_holders(&self) -> HashMap<String, String> {
let graph = self.graph.read().await;
graph
.node_weights()
.filter_map(|node| {
node.current_holder
.as_ref()
.map(|holder| (node.resource_id.clone(), holder.clone()))
})
.collect()
}
pub async fn get_agent_resources(&self, agent_id: &str) -> Vec<String> {
let graph = self.graph.read().await;
graph
.node_weights()
.filter_map(|node| {
if node.current_holder.as_deref() == Some(agent_id) {
Some(node.resource_id.clone())
} else {
None
}
})
.collect()
}
pub async fn get_execution_order(&self, operation_ids: &[String]) -> Vec<String> {
let graph = self.graph.read().await;
let index = self.resource_index.read().await;
let mut ordered = Vec::new();
let mut remaining: HashSet<_> = operation_ids.iter().cloned().collect();
while !remaining.is_empty() {
let mut made_progress = false;
for op_id in remaining.clone() {
if let Some(&node_idx) = index.get(&op_id) {
let all_deps_satisfied = graph
.edges_directed(node_idx, Direction::Incoming)
.all(|edge| {
graph
.node_weight(edge.source())
.map(|n| !remaining.contains(&n.resource_id))
.unwrap_or(true)
});
if all_deps_satisfied {
ordered.push(op_id.clone());
remaining.remove(&op_id);
made_progress = true;
}
} else {
ordered.push(op_id.clone());
remaining.remove(&op_id);
made_progress = true;
}
}
if !made_progress {
ordered.extend(remaining.drain());
break;
}
}
ordered
}
}
impl Default for DependencyState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct ResourceNode {
pub resource_id: String,
pub resource_type: ResourceNodeType,
pub current_holder: Option<String>,
}
#[derive(Debug, Clone)]
pub enum ResourceNodeType {
File(PathBuf),
BuildLock,
TestLock,
GitIndex,
GitBranch(String),
Agent(String),
Generic,
}
#[derive(Debug, Clone)]
pub struct DependencyEdge {
pub dependency_type: DependencyType,
pub strength: DependencyStrength,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DependencyType {
BlockedBy,
Produces,
ConflictsWith,
Reads,
Writes,
WaitsFor,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DependencyStrength {
Hard,
Soft,
Advisory,
}
#[derive(Debug, Clone)]
pub struct ValidationResult {
pub valid: bool,
pub errors: Vec<String>,
pub warnings: Vec<String>,
}
impl ValidationResult {
pub fn ok() -> Self {
Self {
valid: true,
errors: Vec::new(),
warnings: Vec::new(),
}
}
pub fn error(msg: impl Into<String>) -> Self {
Self {
valid: false,
errors: vec![msg.into()],
warnings: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct StateModelProposedOperation {
pub agent_id: String,
pub operation_type: String,
pub resources_needed: Vec<String>,
pub resources_produced: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct StateChange {
pub operation_id: String,
pub application_changes: Vec<ApplicationChange>,
pub new_dependencies: Vec<(String, String, DependencyEdge)>,
}
#[derive(Debug, Clone)]
pub enum ApplicationChange {
FileModified {
path: PathBuf,
new_hash: String,
},
ArtifactInvalidated {
artifact_id: String,
},
GitStateChanged {
new_state: GitState,
},
ResourceCreated {
resource_id: String,
},
ResourceDeleted {
resource_id: String,
},
}
#[derive(Debug, Clone)]
pub struct StateSnapshot {
pub files: HashMap<PathBuf, FileStatus>,
pub locks: HashMap<String, String>,
pub git_state: GitState,
pub active_operations: Vec<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_three_state_model_creation() {
let model = ThreeStateModel::new();
let snapshot = model.snapshot().await;
assert!(snapshot.files.is_empty());
assert!(snapshot.locks.is_empty());
assert!(snapshot.active_operations.is_empty());
}
#[tokio::test]
async fn test_application_state_file_tracking() {
let app_state = ApplicationState::new();
let path = PathBuf::from("/test/file.rs");
app_state
.update_file(path.clone(), "hash123".to_string())
.await;
let files = app_state.get_all_files().await;
assert!(files.contains_key(&path));
assert_eq!(files[&path].content_hash, "hash123");
assert!(files[&path].dirty);
}
#[tokio::test]
async fn test_operation_state_lifecycle() {
let op_state = OperationState::new();
let log = OperationLog::new(
"op-1".to_string(),
"agent-1".to_string(),
"build".to_string(),
serde_json::json!({}),
);
let id = op_state.start_operation(log).await;
assert_eq!(id, "op-1");
let active = op_state.get_active_operations().await;
assert_eq!(active.len(), 1);
op_state.complete_operation(&id, true, None, None).await;
let active = op_state.get_active_operations().await;
assert!(active.is_empty());
let op = op_state.get_operation(&id).await.unwrap();
assert_eq!(op.status, OperationLogStatus::Completed);
}
#[tokio::test]
async fn test_dependency_state_deadlock_detection() {
let dep_state = DependencyState::new();
dep_state
.add_dependency(
"resource-a",
"resource-b",
DependencyEdge {
dependency_type: DependencyType::BlockedBy,
strength: DependencyStrength::Hard,
},
)
.await;
dep_state.set_holder("resource-a", Some("agent-1")).await;
let would_deadlock = dep_state
.would_deadlock("agent-1", &["resource-b".to_string()])
.await;
assert!(!would_deadlock);
}
#[tokio::test]
async fn test_validate_operation_conflict_detection() {
let model = ThreeStateModel::new();
let log = OperationLog::new(
"op-1".to_string(),
"agent-1".to_string(),
"build".to_string(),
serde_json::json!({}),
)
.with_resources(vec!["resource-a".to_string()], vec![]);
model.operation_state.start_operation(log).await;
let proposed = StateModelProposedOperation {
agent_id: "agent-2".to_string(),
operation_type: "build".to_string(),
resources_needed: vec!["resource-a".to_string()],
resources_produced: vec![],
};
let result = model.validate_operation(&proposed).await;
assert!(!result.valid);
assert!(!result.errors.is_empty());
}
#[tokio::test]
async fn test_state_change_recording() {
let model = ThreeStateModel::new();
let change = StateChange {
operation_id: "op-1".to_string(),
application_changes: vec![
ApplicationChange::FileModified {
path: PathBuf::from("/test/file.rs"),
new_hash: "newhash".to_string(),
},
ApplicationChange::ResourceCreated {
resource_id: "build-artifact".to_string(),
},
],
new_dependencies: vec![],
};
model.record_state_change(change).await;
let snapshot = model.snapshot().await;
assert!(snapshot.files.contains_key(&PathBuf::from("/test/file.rs")));
assert!(
model
.application_state
.resource_exists("build-artifact")
.await
);
}
#[tokio::test]
async fn test_execution_order() {
let dep_state = DependencyState::new();
dep_state
.add_dependency(
"op-a",
"op-b",
DependencyEdge {
dependency_type: DependencyType::BlockedBy,
strength: DependencyStrength::Hard,
},
)
.await;
let order = dep_state
.get_execution_order(&["op-a".to_string(), "op-b".to_string()])
.await;
let pos_a = order.iter().position(|x| x == "op-a").unwrap();
let pos_b = order.iter().position(|x| x == "op-b").unwrap();
assert!(pos_b < pos_a);
}
}