use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::crypto::{hash, Hash, PublicKey, Sig};
use crate::error::{Error, Result};
use crate::event::ResourceId;
use super::capability::{CapabilityId, CapabilityKind};
use super::causality::CausalContext;
use super::outcome::ActionOutcome;
use super::principal::PrincipalId;
pub type DurationMs = i64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct CoordinationId(pub [u8; 16]);
impl CoordinationId {
pub fn generate() -> Self {
use rand::RngCore;
let mut bytes = [0u8; 16];
rand::thread_rng().fill_bytes(&mut bytes);
Self(bytes)
}
pub fn from_bytes(bytes: [u8; 16]) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8; 16] {
&self.0
}
pub fn to_hex(&self) -> String {
hex::encode(self.0)
}
pub fn from_hex(s: &str) -> Result<Self> {
let bytes = hex::decode(s).map_err(|_| Error::invalid_input("invalid hex"))?;
if bytes.len() != 16 {
return Err(Error::invalid_input("coordination ID must be 16 bytes"));
}
let mut arr = [0u8; 16];
arr.copy_from_slice(&bytes);
Ok(Self(arr))
}
}
impl std::fmt::Display for CoordinationId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_hex())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TaskId(pub [u8; 16]);
impl TaskId {
pub fn generate() -> Self {
use rand::RngCore;
let mut bytes = [0u8; 16];
rand::thread_rng().fill_bytes(&mut bytes);
Self(bytes)
}
pub fn from_bytes(bytes: [u8; 16]) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8; 16] {
&self.0
}
pub fn to_hex(&self) -> String {
hex::encode(self.0)
}
}
impl std::fmt::Display for TaskId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_hex())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CoordinationType {
Parallel,
Pipeline,
Consensus,
Supervised,
Competitive,
}
impl std::fmt::Display for CoordinationType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CoordinationType::Parallel => write!(f, "parallel"),
CoordinationType::Pipeline => write!(f, "pipeline"),
CoordinationType::Consensus => write!(f, "consensus"),
CoordinationType::Supervised => write!(f, "supervised"),
CoordinationType::Competitive => write!(f, "competitive"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "role", rename_all = "snake_case")]
pub enum ParticipantRole {
Coordinator,
Peer,
Supervisor,
ServiceProvider {
service: String,
},
Observer,
}
impl ParticipantRole {
pub fn service_provider(service: impl Into<String>) -> Self {
Self::ServiceProvider {
service: service.into(),
}
}
pub fn can_execute(&self) -> bool {
!matches!(self, ParticipantRole::Observer)
}
pub fn can_supervise(&self) -> bool {
matches!(
self,
ParticipantRole::Coordinator | ParticipantRole::Supervisor
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Responsibility {
Individual,
Shared {
share: f64,
},
Delegated {
delegator: PublicKey,
},
Supervised {
supervisor: PublicKey,
},
}
impl Responsibility {
pub fn individual() -> Self {
Self::Individual
}
pub fn shared(share: f64) -> Self {
Self::Shared {
share: share.clamp(0.0, 1.0),
}
}
pub fn delegated(delegator: PublicKey) -> Self {
Self::Delegated { delegator }
}
pub fn supervised(supervisor: PublicKey) -> Self {
Self::Supervised { supervisor }
}
pub fn share(&self) -> f64 {
match self {
Responsibility::Individual => 1.0,
Responsibility::Shared { share } => *share,
Responsibility::Delegated { .. } => 0.0, Responsibility::Supervised { .. } => 0.0, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Participant {
agent: PublicKey,
role: ParticipantRole,
capabilities: Vec<CapabilityId>,
responsibility: Responsibility,
commitment: Sig,
}
impl Participant {
pub fn new(
agent: PublicKey,
role: ParticipantRole,
responsibility: Responsibility,
commitment: Sig,
) -> Self {
Self {
agent,
role,
capabilities: Vec::new(),
responsibility,
commitment,
}
}
pub fn with_commitment(
agent: PublicKey,
role: ParticipantRole,
responsibility: Responsibility,
commitment: Sig,
) -> Self {
Self {
agent,
role,
capabilities: Vec::new(),
responsibility,
commitment,
}
}
pub fn with_capabilities(mut self, capabilities: Vec<CapabilityId>) -> Self {
self.capabilities = capabilities;
self
}
pub fn agent(&self) -> &PublicKey {
&self.agent
}
pub fn role(&self) -> &ParticipantRole {
&self.role
}
pub fn capabilities(&self) -> &[CapabilityId] {
&self.capabilities
}
pub fn responsibility(&self) -> &Responsibility {
&self.responsibility
}
pub fn commitment(&self) -> &Sig {
&self.commitment
}
pub fn is_coordinator(&self) -> bool {
matches!(self.role, ParticipantRole::Coordinator)
}
pub fn verify_commitment(&self, spec: &CoordinatedActionSpec) -> Result<()> {
let message = spec.canonical_bytes();
self.agent.verify(&message, &self.commitment).map_err(|_| {
Error::invalid_input("participant commitment does not verify against spec")
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Task {
id: TaskId,
description: String,
required_capabilities: Vec<CapabilityKind>,
deadline: Option<i64>,
}
impl Task {
pub fn new(description: impl Into<String>) -> Self {
Self {
id: TaskId::generate(),
description: description.into(),
required_capabilities: Vec::new(),
deadline: None,
}
}
pub fn with_id(mut self, id: TaskId) -> Self {
self.id = id;
self
}
pub fn with_capabilities(mut self, capabilities: Vec<CapabilityKind>) -> Self {
self.required_capabilities = capabilities;
self
}
pub fn with_deadline(mut self, deadline: i64) -> Self {
self.deadline = Some(deadline);
self
}
pub fn id(&self) -> TaskId {
self.id
}
pub fn description(&self) -> &str {
&self.description
}
pub fn required_capabilities(&self) -> &[CapabilityKind] {
&self.required_capabilities
}
pub fn deadline(&self) -> Option<i64> {
self.deadline
}
pub fn is_overdue(&self) -> bool {
if let Some(deadline) = self.deadline {
chrono::Utc::now().timestamp_millis() > deadline
} else {
false
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskDependency {
task: TaskId,
depends_on: Vec<TaskId>,
}
impl TaskDependency {
pub fn new(task: TaskId, depends_on: Vec<TaskId>) -> Self {
Self { task, depends_on }
}
pub fn task(&self) -> TaskId {
self.task
}
pub fn depends_on(&self) -> &[TaskId] {
&self.depends_on
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum FailureHandling {
AbortAll,
ContinuePartial,
Retry {
max_attempts: u32,
},
Escalate,
}
impl Default for FailureHandling {
fn default() -> Self {
Self::AbortAll
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CoordinatedActionSpec {
goal: String,
tasks: HashMap<PublicKey, Vec<Task>>,
dependencies: Vec<TaskDependency>,
success_criteria: Vec<String>,
failure_handling: FailureHandling,
}
impl CoordinatedActionSpec {
pub fn new(goal: impl Into<String>) -> Self {
Self {
goal: goal.into(),
tasks: HashMap::new(),
dependencies: Vec::new(),
success_criteria: Vec::new(),
failure_handling: FailureHandling::default(),
}
}
pub fn with_tasks(mut self, agent: &PublicKey, tasks: Vec<Task>) -> Self {
self.tasks.insert(agent.clone(), tasks);
self
}
pub fn with_dependency(mut self, dependency: TaskDependency) -> Self {
self.dependencies.push(dependency);
self
}
pub fn with_criterion(mut self, criterion: impl Into<String>) -> Self {
self.success_criteria.push(criterion.into());
self
}
pub fn with_failure_handling(mut self, handling: FailureHandling) -> Self {
self.failure_handling = handling;
self
}
pub fn goal(&self) -> &str {
&self.goal
}
pub fn tasks_for(&self, agent: &PublicKey) -> Option<&[Task]> {
self.tasks.get(agent).map(|v| v.as_slice())
}
pub fn all_tasks(&self) -> impl Iterator<Item = &Task> {
self.tasks.values().flat_map(|v| v.iter())
}
pub fn dependencies(&self) -> &[TaskDependency] {
&self.dependencies
}
pub fn success_criteria(&self) -> &[String] {
&self.success_criteria
}
pub fn failure_handling(&self) -> &FailureHandling {
&self.failure_handling
}
pub fn canonical_bytes(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap_or_default()
}
pub fn hash(&self) -> Hash {
hash(&self.canonical_bytes())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CoordinationProtocol {
TwoPhaseCommit,
Consensus {
threshold: f64,
},
LeaderFollower,
Async {
timeout: DurationMs,
},
Custom {
protocol_id: String,
},
}
impl CoordinationProtocol {
pub fn consensus(threshold: f64) -> Self {
Self::Consensus {
threshold: threshold.clamp(0.0, 1.0),
}
}
pub fn async_with_timeout(timeout: DurationMs) -> Self {
Self::Async { timeout }
}
pub fn custom(protocol_id: impl Into<String>) -> Self {
Self::Custom {
protocol_id: protocol_id.into(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CoordinationMetrics {
total_duration: DurationMs,
per_agent_duration: HashMap<PublicKey, DurationMs>,
communication_overhead: DurationMs,
retry_count: u32,
}
impl CoordinationMetrics {
pub fn new(total_duration: DurationMs) -> Self {
Self {
total_duration,
per_agent_duration: HashMap::new(),
communication_overhead: 0,
retry_count: 0,
}
}
pub fn with_agent_duration(mut self, agent: &PublicKey, duration: DurationMs) -> Self {
self.per_agent_duration.insert(agent.clone(), duration);
self
}
pub fn with_overhead(mut self, overhead: DurationMs) -> Self {
self.communication_overhead = overhead;
self
}
pub fn with_retries(mut self, count: u32) -> Self {
self.retry_count = count;
self
}
pub fn total_duration(&self) -> DurationMs {
self.total_duration
}
pub fn agent_duration(&self, agent: &PublicKey) -> Option<DurationMs> {
self.per_agent_duration.get(agent).copied()
}
pub fn communication_overhead(&self) -> DurationMs {
self.communication_overhead
}
pub fn retry_count(&self) -> u32 {
self.retry_count
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CoordinationResult {
outcome: ActionOutcome,
agent_outcomes: HashMap<PublicKey, ActionOutcome>,
output: serde_json::Value,
metrics: CoordinationMetrics,
}
impl CoordinationResult {
pub fn new(
outcome: ActionOutcome,
output: serde_json::Value,
metrics: CoordinationMetrics,
) -> Self {
Self {
outcome,
agent_outcomes: HashMap::new(),
output,
metrics,
}
}
pub fn with_agent_outcome(mut self, agent: &PublicKey, outcome: ActionOutcome) -> Self {
self.agent_outcomes.insert(agent.clone(), outcome);
self
}
pub fn outcome(&self) -> &ActionOutcome {
&self.outcome
}
pub fn agent_outcome(&self, agent: &PublicKey) -> Option<&ActionOutcome> {
self.agent_outcomes.get(agent)
}
pub fn output(&self) -> &serde_json::Value {
&self.output
}
pub fn metrics(&self) -> &CoordinationMetrics {
&self.metrics
}
pub fn is_success(&self) -> bool {
self.outcome.is_success()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum CoordinationStatus {
Initializing,
WaitingCommitment,
Active {
progress: f64,
},
Completed {
result: CoordinationResult,
},
Failed {
reason: String,
partial_result: Option<CoordinationResult>,
},
Aborted {
reason: String,
aborted_by: PublicKey,
},
}
impl CoordinationStatus {
pub fn active(progress: f64) -> Self {
Self::Active {
progress: progress.clamp(0.0, 1.0),
}
}
pub fn completed(result: CoordinationResult) -> Self {
Self::Completed { result }
}
pub fn failed(reason: impl Into<String>, partial_result: Option<CoordinationResult>) -> Self {
Self::Failed {
reason: reason.into(),
partial_result,
}
}
pub fn aborted(reason: impl Into<String>, aborted_by: PublicKey) -> Self {
Self::Aborted {
reason: reason.into(),
aborted_by,
}
}
pub fn is_active(&self) -> bool {
matches!(
self,
CoordinationStatus::Initializing
| CoordinationStatus::WaitingCommitment
| CoordinationStatus::Active { .. }
)
}
pub fn is_terminal(&self) -> bool {
!self.is_active()
}
pub fn progress(&self) -> Option<f64> {
match self {
CoordinationStatus::Active { progress } => Some(*progress),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CoordinatedAction {
id: CoordinationId,
coordination_type: CoordinationType,
participants: Vec<Participant>,
action: CoordinatedActionSpec,
protocol: CoordinationProtocol,
started_at: i64,
status: CoordinationStatus,
causal_context: CausalContext,
}
impl CoordinatedAction {
pub fn builder() -> CoordinatedActionBuilder {
CoordinatedActionBuilder::new()
}
pub fn id(&self) -> CoordinationId {
self.id
}
pub fn coordination_type(&self) -> &CoordinationType {
&self.coordination_type
}
pub fn participants(&self) -> &[Participant] {
&self.participants
}
pub fn action(&self) -> &CoordinatedActionSpec {
&self.action
}
pub fn protocol(&self) -> &CoordinationProtocol {
&self.protocol
}
pub fn started_at(&self) -> i64 {
self.started_at
}
pub fn status(&self) -> &CoordinationStatus {
&self.status
}
pub fn causal_context(&self) -> &CausalContext {
&self.causal_context
}
pub fn set_status(&mut self, status: CoordinationStatus) {
self.status = status;
}
pub fn coordinator(&self) -> Option<&Participant> {
self.participants.iter().find(|p| p.is_coordinator())
}
pub fn validate_coordinator(&self) -> Result<()> {
let coordinator_count = self
.participants
.iter()
.filter(|p| p.is_coordinator())
.count();
if coordinator_count != 1 {
return Err(Error::invalid_input(format!(
"coordination must have exactly one coordinator, found {}",
coordinator_count
)));
}
Ok(())
}
pub fn validate_commitments(&self) -> Result<()> {
for (i, participant) in self.participants.iter().enumerate() {
participant.verify_commitment(&self.action).map_err(|_| {
Error::invalid_input(format!(
"participant {} commitment does not verify against action spec",
i
))
})?;
}
Ok(())
}
pub fn validate_responsibility(&self) -> Result<()> {
let shared_sum: f64 = self
.participants
.iter()
.filter_map(|p| match &p.responsibility {
Responsibility::Shared { share } => Some(*share),
_ => None,
})
.sum();
if shared_sum > 0.0 && (shared_sum - 1.0).abs() > 0.001 {
return Err(Error::invalid_input(format!(
"shared responsibility must sum to 1.0, got {}",
shared_sum
)));
}
Ok(())
}
}
#[derive(Debug, Default)]
pub struct CoordinatedActionBuilder {
id: Option<CoordinationId>,
coordination_type: Option<CoordinationType>,
participants: Vec<Participant>,
action: Option<CoordinatedActionSpec>,
protocol: Option<CoordinationProtocol>,
started_at: Option<i64>,
status: Option<CoordinationStatus>,
causal_context: Option<CausalContext>,
}
impl CoordinatedActionBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn id(mut self, id: CoordinationId) -> Self {
self.id = Some(id);
self
}
pub fn coordination_type(mut self, coordination_type: CoordinationType) -> Self {
self.coordination_type = Some(coordination_type);
self
}
pub fn participant(mut self, participant: Participant) -> Self {
self.participants.push(participant);
self
}
pub fn action(mut self, action: CoordinatedActionSpec) -> Self {
self.action = Some(action);
self
}
pub fn protocol(mut self, protocol: CoordinationProtocol) -> Self {
self.protocol = Some(protocol);
self
}
pub fn started_at(mut self, timestamp: i64) -> Self {
self.started_at = Some(timestamp);
self
}
pub fn started_now(mut self) -> Self {
self.started_at = Some(chrono::Utc::now().timestamp_millis());
self
}
pub fn status(mut self, status: CoordinationStatus) -> Self {
self.status = Some(status);
self
}
pub fn causal_context(mut self, context: CausalContext) -> Self {
self.causal_context = Some(context);
self
}
pub fn build(self) -> Result<CoordinatedAction> {
let id = self.id.unwrap_or_else(CoordinationId::generate);
let coordination_type = self
.coordination_type
.ok_or_else(|| Error::invalid_input("coordination_type is required"))?;
if self.participants.is_empty() {
return Err(Error::invalid_input("at least one participant is required"));
}
let action = self
.action
.ok_or_else(|| Error::invalid_input("action is required"))?;
let protocol = self
.protocol
.ok_or_else(|| Error::invalid_input("protocol is required"))?;
let started_at = self
.started_at
.unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
let status = self.status.unwrap_or(CoordinationStatus::Initializing);
let causal_context = self
.causal_context
.ok_or_else(|| Error::invalid_input("causal_context is required"))?;
let coordination = CoordinatedAction {
id,
coordination_type,
participants: self.participants,
action,
protocol,
started_at,
status,
causal_context,
};
coordination.validate_coordinator()?;
coordination.validate_responsibility()?;
Ok(coordination)
}
pub fn build_verified(self) -> Result<CoordinatedAction> {
let coordination = self.build()?;
coordination.validate_commitments()?;
Ok(coordination)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CoordinationEvent {
Started {
action: CoordinatedAction,
},
ParticipantJoined {
coordination_id: CoordinationId,
participant: Participant,
},
TaskAssigned {
coordination_id: CoordinationId,
agent: PublicKey,
task: Task,
},
TaskCompleted {
coordination_id: CoordinationId,
task_id: TaskId,
agent: PublicKey,
outcome: ActionOutcome,
},
Message {
coordination_id: CoordinationId,
from: PublicKey,
to: PublicKey,
message_hash: Hash,
},
Disagreement {
coordination_id: CoordinationId,
agents: Vec<PublicKey>,
subject: String,
positions: HashMap<PublicKey, String>,
},
Completed {
coordination_id: CoordinationId,
result: CoordinationResult,
},
Failed {
coordination_id: CoordinationId,
reason: String,
failed_at: i64,
},
}
impl CoordinationEvent {
pub fn started(action: CoordinatedAction) -> Self {
Self::Started { action }
}
pub fn participant_joined(coordination_id: CoordinationId, participant: Participant) -> Self {
Self::ParticipantJoined {
coordination_id,
participant,
}
}
pub fn task_assigned(coordination_id: CoordinationId, agent: PublicKey, task: Task) -> Self {
Self::TaskAssigned {
coordination_id,
agent,
task,
}
}
pub fn task_completed(
coordination_id: CoordinationId,
task_id: TaskId,
agent: PublicKey,
outcome: ActionOutcome,
) -> Self {
Self::TaskCompleted {
coordination_id,
task_id,
agent,
outcome,
}
}
pub fn message(
coordination_id: CoordinationId,
from: PublicKey,
to: PublicKey,
message_hash: Hash,
) -> Self {
Self::Message {
coordination_id,
from,
to,
message_hash,
}
}
pub fn disagreement(
coordination_id: CoordinationId,
agents: Vec<PublicKey>,
subject: impl Into<String>,
) -> Self {
Self::Disagreement {
coordination_id,
agents,
subject: subject.into(),
positions: HashMap::new(),
}
}
pub fn completed(coordination_id: CoordinationId, result: CoordinationResult) -> Self {
Self::Completed {
coordination_id,
result,
}
}
pub fn failed(coordination_id: CoordinationId, reason: impl Into<String>) -> Self {
Self::Failed {
coordination_id,
reason: reason.into(),
failed_at: chrono::Utc::now().timestamp_millis(),
}
}
pub fn coordination_id(&self) -> CoordinationId {
match self {
CoordinationEvent::Started { action } => action.id(),
CoordinationEvent::ParticipantJoined {
coordination_id, ..
} => *coordination_id,
CoordinationEvent::TaskAssigned {
coordination_id, ..
} => *coordination_id,
CoordinationEvent::TaskCompleted {
coordination_id, ..
} => *coordination_id,
CoordinationEvent::Message {
coordination_id, ..
} => *coordination_id,
CoordinationEvent::Disagreement {
coordination_id, ..
} => *coordination_id,
CoordinationEvent::Completed {
coordination_id, ..
} => *coordination_id,
CoordinationEvent::Failed {
coordination_id, ..
} => *coordination_id,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuorumPolicy {
pub required_votes: u32,
pub total_participants: u32,
pub timeout_ms: u64,
pub abstentions_count: bool,
}
impl QuorumPolicy {
pub fn majority(total: u32) -> Self {
Self {
required_votes: total / 2 + 1,
total_participants: total,
timeout_ms: 30_000,
abstentions_count: false,
}
}
pub fn unanimous(total: u32) -> Self {
Self {
required_votes: total,
total_participants: total,
timeout_ms: 60_000,
abstentions_count: false,
}
}
pub fn threshold(required: u32, total: u32) -> Self {
Self {
required_votes: required,
total_participants: total,
timeout_ms: 30_000,
abstentions_count: false,
}
}
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms;
self
}
pub fn is_met(&self, votes_for: u32) -> bool {
votes_for >= self.required_votes
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Conflict {
pub id: ConflictId,
pub agents: Vec<PublicKey>,
pub resources: Vec<ResourceId>,
pub description: String,
pub detected_at: i64,
pub status: ConflictStatus,
}
define_id!(ConflictId, "conflict identifier");
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum ConflictStatus {
Detected,
Resolving,
Resolved {
resolution: Box<ConflictResolutionMethod>,
resolved_at: i64,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "method", rename_all = "snake_case")]
pub enum ConflictResolutionMethod {
FirstWriter,
Priority { winner: PublicKey },
HumanArbitration { arbiter: PrincipalId },
Merge,
Rollback,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CoordinationCostTracker {
pub compute_cost: u64,
pub api_calls: u64,
pub tokens_consumed: u64,
pub budget_limit: Option<u64>,
pub per_agent_costs: std::collections::HashMap<PublicKey, u64>,
}
impl CoordinationCostTracker {
pub fn new() -> Self {
Self::default()
}
pub fn with_budget(mut self, limit: u64) -> Self {
self.budget_limit = Some(limit);
self
}
pub fn record_cost(&mut self, agent: &PublicKey, cost: u64) {
self.compute_cost = self.compute_cost.saturating_add(cost);
*self.per_agent_costs.entry(agent.clone()).or_default() += cost;
}
pub fn record_api_call(&mut self) {
self.api_calls += 1;
}
pub fn record_tokens(&mut self, tokens: u64) {
self.tokens_consumed = self.tokens_consumed.saturating_add(tokens);
}
pub fn is_over_budget(&self) -> bool {
match self.budget_limit {
Some(limit) => self.compute_cost >= limit,
None => false,
}
}
pub fn remaining_budget(&self) -> Option<u64> {
self.budget_limit
.map(|limit| limit.saturating_sub(self.compute_cost))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::principal::PrincipalId;
use crate::agent::session::SessionId;
use crate::crypto::SecretKey;
use crate::event::EventId;
fn test_key() -> SecretKey {
SecretKey::generate()
}
fn test_causal_context() -> CausalContext {
let principal = PrincipalId::user("test@example.com").unwrap();
CausalContext::builder()
.parent_event_id(EventId(hash(b"parent")))
.root_event_id(EventId(hash(b"root")))
.session_id(SessionId::random())
.principal(principal)
.sequence(1)
.depth(1)
.build()
.unwrap()
}
#[test]
fn coordination_id_generates_unique() {
let id1 = CoordinationId::generate();
let id2 = CoordinationId::generate();
assert_ne!(id1, id2);
}
#[test]
fn coordination_id_hex_roundtrip() {
let id = CoordinationId::generate();
let hex = id.to_hex();
let restored = CoordinationId::from_hex(&hex).unwrap();
assert_eq!(id, restored);
}
#[test]
fn task_id_generates_unique() {
let id1 = TaskId::generate();
let id2 = TaskId::generate();
assert_ne!(id1, id2);
}
#[test]
fn coordination_type_display() {
assert_eq!(CoordinationType::Parallel.to_string(), "parallel");
assert_eq!(CoordinationType::Pipeline.to_string(), "pipeline");
assert_eq!(CoordinationType::Consensus.to_string(), "consensus");
}
#[test]
fn participant_role_can_execute() {
assert!(ParticipantRole::Coordinator.can_execute());
assert!(ParticipantRole::Peer.can_execute());
assert!(!ParticipantRole::Observer.can_execute());
}
#[test]
fn participant_role_can_supervise() {
assert!(ParticipantRole::Coordinator.can_supervise());
assert!(ParticipantRole::Supervisor.can_supervise());
assert!(!ParticipantRole::Peer.can_supervise());
assert!(!ParticipantRole::Observer.can_supervise());
}
#[test]
fn responsibility_share() {
assert_eq!(Responsibility::individual().share(), 1.0);
assert_eq!(Responsibility::shared(0.5).share(), 0.5);
let key = test_key();
assert_eq!(Responsibility::delegated(key.public_key()).share(), 0.0);
assert_eq!(Responsibility::supervised(key.public_key()).share(), 0.0);
}
#[test]
fn responsibility_share_clamped() {
assert_eq!(Responsibility::shared(1.5).share(), 1.0);
assert_eq!(Responsibility::shared(-0.5).share(), 0.0);
}
#[test]
fn task_creation() {
let task = Task::new("Process data")
.with_capabilities(vec![CapabilityKind::Read])
.with_deadline(chrono::Utc::now().timestamp_millis() + 60000);
assert_eq!(task.description(), "Process data");
assert_eq!(task.required_capabilities().len(), 1);
assert!(task.deadline().is_some());
}
#[test]
fn task_overdue() {
let past = chrono::Utc::now().timestamp_millis() - 1000;
let task = Task::new("Late task").with_deadline(past);
assert!(task.is_overdue());
let future = chrono::Utc::now().timestamp_millis() + 60000;
let task = Task::new("Future task").with_deadline(future);
assert!(!task.is_overdue());
}
#[test]
fn action_spec_hash_deterministic() {
let spec1 = CoordinatedActionSpec::new("Complete task");
let spec2 = CoordinatedActionSpec::new("Complete task");
assert_eq!(spec1.hash(), spec2.hash());
}
#[test]
fn coordination_status_is_active() {
assert!(CoordinationStatus::Initializing.is_active());
assert!(CoordinationStatus::WaitingCommitment.is_active());
assert!(CoordinationStatus::active(0.5).is_active());
let key = test_key();
assert!(!CoordinationStatus::aborted("test", key.public_key()).is_active());
}
#[test]
fn coordination_status_progress() {
assert_eq!(CoordinationStatus::active(0.75).progress(), Some(0.75));
assert_eq!(CoordinationStatus::Initializing.progress(), None);
}
#[test]
fn coordinated_action_requires_coordinator() {
let key = test_key();
let participant = Participant::new(
key.public_key(),
ParticipantRole::Peer, Responsibility::individual(),
Sig::empty(),
);
let result = CoordinatedAction::builder()
.coordination_type(CoordinationType::Parallel)
.participant(participant)
.action(CoordinatedActionSpec::new("Test"))
.protocol(CoordinationProtocol::TwoPhaseCommit)
.causal_context(test_causal_context())
.started_now()
.build();
assert!(result.is_err());
}
#[test]
fn coordinated_action_valid() {
let coordinator_key = test_key();
let peer_key = test_key();
let coordinator = Participant::new(
coordinator_key.public_key(),
ParticipantRole::Coordinator,
Responsibility::individual(),
Sig::empty(),
);
let peer = Participant::new(
peer_key.public_key(),
ParticipantRole::Peer,
Responsibility::individual(),
Sig::empty(),
);
let action = CoordinatedAction::builder()
.coordination_type(CoordinationType::Parallel)
.participant(coordinator)
.participant(peer)
.action(CoordinatedActionSpec::new("Complete task together"))
.protocol(CoordinationProtocol::TwoPhaseCommit)
.causal_context(test_causal_context())
.started_now()
.build()
.unwrap();
assert!(action.coordinator().is_some());
assert_eq!(action.participants().len(), 2);
}
#[test]
fn coordinated_action_shared_responsibility_must_sum() {
let key1 = test_key();
let key2 = test_key();
let p1 = Participant::new(
key1.public_key(),
ParticipantRole::Coordinator,
Responsibility::shared(0.3),
Sig::empty(),
);
let p2 = Participant::new(
key2.public_key(),
ParticipantRole::Peer,
Responsibility::shared(0.3), Sig::empty(),
);
let result = CoordinatedAction::builder()
.coordination_type(CoordinationType::Parallel)
.participant(p1)
.participant(p2)
.action(CoordinatedActionSpec::new("Test"))
.protocol(CoordinationProtocol::TwoPhaseCommit)
.causal_context(test_causal_context())
.build();
assert!(result.is_err());
}
#[test]
fn coordinated_action_valid_shared_responsibility() {
let key1 = test_key();
let key2 = test_key();
let p1 = Participant::new(
key1.public_key(),
ParticipantRole::Coordinator,
Responsibility::shared(0.6),
Sig::empty(),
);
let p2 = Participant::new(
key2.public_key(),
ParticipantRole::Peer,
Responsibility::shared(0.4), Sig::empty(),
);
let result = CoordinatedAction::builder()
.coordination_type(CoordinationType::Parallel)
.participant(p1)
.participant(p2)
.action(CoordinatedActionSpec::new("Test"))
.protocol(CoordinationProtocol::TwoPhaseCommit)
.causal_context(test_causal_context())
.build();
assert!(result.is_ok());
}
#[test]
fn participant_commitment_verifies_against_spec() {
let key = test_key();
let spec = CoordinatedActionSpec::new("deploy-service");
let spec_bytes = spec.canonical_bytes();
let commitment = key.sign(&spec_bytes);
let participant = Participant::with_commitment(
key.public_key(),
ParticipantRole::Coordinator,
Responsibility::individual(),
commitment,
);
assert!(participant.verify_commitment(&spec).is_ok());
}
#[test]
fn participant_commitment_wrong_spec_rejected() {
let key = test_key();
let spec_a = CoordinatedActionSpec::new("deploy-service");
let spec_b = CoordinatedActionSpec::new("rollback-service");
let commitment = key.sign(&spec_a.canonical_bytes());
let participant = Participant::with_commitment(
key.public_key(),
ParticipantRole::Peer,
Responsibility::individual(),
commitment,
);
assert!(participant.verify_commitment(&spec_b).is_err());
}
#[test]
fn participant_commitment_wrong_key_rejected() {
let real_key = test_key();
let wrong_key = test_key();
let spec = CoordinatedActionSpec::new("deploy-service");
let commitment = real_key.sign(&spec.canonical_bytes());
let participant = Participant::with_commitment(
wrong_key.public_key(), ParticipantRole::Peer,
Responsibility::individual(),
commitment, );
assert!(participant.verify_commitment(&spec).is_err());
}
#[test]
fn participant_empty_commitment_rejected() {
let key = test_key();
let spec = CoordinatedActionSpec::new("deploy-service");
let participant = Participant::new(
key.public_key(),
ParticipantRole::Coordinator,
Responsibility::individual(),
Sig::empty(),
);
assert!(participant.verify_commitment(&spec).is_err());
}
#[test]
fn coordinated_action_build_verified_validates_commitments() {
let coord_key = test_key();
let peer_key = test_key();
let spec = CoordinatedActionSpec::new("deploy");
let coord_commitment = coord_key.sign(&spec.canonical_bytes());
let p1 = Participant::with_commitment(
coord_key.public_key(),
ParticipantRole::Coordinator,
Responsibility::individual(),
coord_commitment,
);
let p2 = Participant::new(
peer_key.public_key(),
ParticipantRole::Peer,
Responsibility::individual(),
Sig::empty(),
);
let result = CoordinatedAction::builder()
.coordination_type(CoordinationType::Supervised)
.participant(p1)
.participant(p2)
.action(spec)
.protocol(CoordinationProtocol::TwoPhaseCommit)
.causal_context(test_causal_context())
.build_verified();
assert!(result.is_err());
}
#[test]
fn coordinated_action_build_verified_succeeds_with_valid_commitments() {
let coord_key = test_key();
let peer_key = test_key();
let spec = CoordinatedActionSpec::new("deploy");
let coord_commitment = coord_key.sign(&spec.canonical_bytes());
let peer_commitment = peer_key.sign(&spec.canonical_bytes());
let p1 = Participant::with_commitment(
coord_key.public_key(),
ParticipantRole::Coordinator,
Responsibility::individual(),
coord_commitment,
);
let p2 = Participant::with_commitment(
peer_key.public_key(),
ParticipantRole::Peer,
Responsibility::individual(),
peer_commitment,
);
let result = CoordinatedAction::builder()
.coordination_type(CoordinationType::Parallel)
.participant(p1)
.participant(p2)
.action(spec)
.protocol(CoordinationProtocol::TwoPhaseCommit)
.causal_context(test_causal_context())
.build_verified();
assert!(result.is_ok());
}
#[test]
fn coordination_event_started() {
let key = test_key();
let coordinator = Participant::new(
key.public_key(),
ParticipantRole::Coordinator,
Responsibility::individual(),
Sig::empty(),
);
let action = CoordinatedAction::builder()
.coordination_type(CoordinationType::Pipeline)
.participant(coordinator)
.action(CoordinatedActionSpec::new("Pipeline task"))
.protocol(CoordinationProtocol::LeaderFollower)
.causal_context(test_causal_context())
.started_now()
.build()
.unwrap();
let coord_id = action.id();
let event = CoordinationEvent::started(action);
assert_eq!(event.coordination_id(), coord_id);
}
#[test]
fn coordination_event_task_completed() {
let key = test_key();
let coord_id = CoordinationId::generate();
let task_id = TaskId::generate();
let event = CoordinationEvent::task_completed(
coord_id,
task_id,
key.public_key(),
ActionOutcome::success(serde_json::json!({"status": "done"})),
);
assert_eq!(event.coordination_id(), coord_id);
}
#[test]
fn coordination_event_message() {
let key1 = test_key();
let key2 = test_key();
let coord_id = CoordinationId::generate();
let event = CoordinationEvent::message(
coord_id,
key1.public_key(),
key2.public_key(),
hash(b"message content"),
);
assert_eq!(event.coordination_id(), coord_id);
}
#[test]
fn coordination_result_with_agent_outcomes() {
let key1 = test_key();
let key2 = test_key();
let result = CoordinationResult::new(
ActionOutcome::success(serde_json::json!({})),
serde_json::json!({"combined": true}),
CoordinationMetrics::new(5000),
)
.with_agent_outcome(
&key1.public_key(),
ActionOutcome::success(serde_json::json!({})),
)
.with_agent_outcome(
&key2.public_key(),
ActionOutcome::success(serde_json::json!({})),
);
assert!(result.is_success());
assert!(result.agent_outcome(&key1.public_key()).is_some());
assert!(result.agent_outcome(&key2.public_key()).is_some());
}
#[test]
fn coordination_metrics() {
let key = test_key();
let metrics = CoordinationMetrics::new(10000)
.with_agent_duration(&key.public_key(), 5000)
.with_overhead(500)
.with_retries(2);
assert_eq!(metrics.total_duration(), 10000);
assert_eq!(metrics.agent_duration(&key.public_key()), Some(5000));
assert_eq!(metrics.communication_overhead(), 500);
assert_eq!(metrics.retry_count(), 2);
}
#[test]
fn coordination_task_lookup_by_public_key() {
let key = test_key();
let task = Task::new("deploy-service");
let spec = CoordinatedActionSpec::new("deploy").with_tasks(&key.public_key(), vec![task]);
let tasks = spec.tasks_for(&key.public_key());
assert!(tasks.is_some());
assert_eq!(tasks.unwrap().len(), 1);
}
#[test]
fn coordination_task_lookup_different_key_returns_none() {
let key1 = test_key();
let key2 = test_key();
let task = Task::new("deploy-service");
let spec = CoordinatedActionSpec::new("deploy").with_tasks(&key1.public_key(), vec![task]);
assert!(spec.tasks_for(&key2.public_key()).is_none());
}
#[test]
fn coordination_result_agent_outcome_by_public_key() {
let key1 = test_key();
let key2 = test_key();
let result = CoordinationResult::new(
ActionOutcome::success(serde_json::json!({})),
serde_json::json!({}),
CoordinationMetrics::new(1000),
)
.with_agent_outcome(
&key1.public_key(),
ActionOutcome::success(serde_json::json!({})),
);
assert!(result.agent_outcome(&key1.public_key()).is_some());
assert!(result.agent_outcome(&key2.public_key()).is_none());
}
#[test]
fn coordination_metrics_agent_duration_by_public_key() {
let key1 = test_key();
let key2 = test_key();
let metrics = CoordinationMetrics::new(5000).with_agent_duration(&key1.public_key(), 3000);
assert_eq!(metrics.agent_duration(&key1.public_key()), Some(3000));
assert_eq!(metrics.agent_duration(&key2.public_key()), None);
}
}