pub mod publisher;
pub use publisher::{OpControlEnvelope, OpControlPublisher, SharedOpControlPublisher};
use aa_proto::assembly::common::v1::AgentId;
use aa_proto::assembly::policy::v1::OpControlSignal;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum OpState {
Pending,
Running,
Paused,
Completing,
Terminated,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct OpRecord {
pub op_id: String,
pub state: OpState,
pub registered_at: String,
pub updated_at: String,
}
#[derive(Debug, PartialEq, Eq)]
pub enum OpsError {
NotFound,
InvalidTransition,
}
pub struct OpsRegistry {
ops: DashMap<String, OpRecord>,
agents: DashMap<String, AgentId>,
publisher: Option<SharedOpControlPublisher>,
}
impl Default for OpsRegistry {
fn default() -> Self {
Self::new()
}
}
impl OpsRegistry {
pub fn new() -> Self {
Self {
ops: DashMap::new(),
agents: DashMap::new(),
publisher: None,
}
}
pub fn with_publisher(mut self, publisher: SharedOpControlPublisher) -> Self {
self.publisher = Some(publisher);
self
}
pub fn register(&self, op_id: String) -> OpRecord {
let now = chrono::Utc::now().to_rfc3339();
let record = OpRecord {
op_id: op_id.clone(),
state: OpState::Running,
registered_at: now.clone(),
updated_at: now,
};
self.ops.insert(op_id, record.clone());
record
}
pub fn ingest(&self, op_id: String) -> OpRecord {
if let Some(existing) = self.ops.get(&op_id) {
return existing.clone();
}
let now = chrono::Utc::now().to_rfc3339();
let record = OpRecord {
op_id: op_id.clone(),
state: OpState::Pending,
registered_at: now.clone(),
updated_at: now,
};
self.ops.insert(op_id, record.clone());
record
}
pub fn ingest_with_agent(&self, op_id: String, agent_id: AgentId) -> OpRecord {
self.agents.insert(op_id.clone(), agent_id);
self.ingest(op_id)
}
fn maybe_publish(&self, op_id: &str, signal: OpControlSignal) {
if let (Some(pub_), Some(agent)) = (self.publisher.as_ref(), self.agents.get(op_id)) {
pub_.publish(agent.clone(), op_id.to_string(), signal);
}
}
pub fn get(&self, op_id: &str) -> Option<OpRecord> {
self.ops.get(op_id).map(|r| r.clone())
}
pub fn agent_for(&self, op_id: &str) -> Option<AgentId> {
self.agents.get(op_id).map(|a| a.clone())
}
pub fn list(&self) -> Vec<OpRecord> {
self.ops.iter().map(|r| r.clone()).collect()
}
pub fn allow(&self, op_id: &str) -> Result<OpRecord, OpsError> {
let mut entry = self.ops.get_mut(op_id).ok_or(OpsError::NotFound)?;
match entry.state {
OpState::Pending => {
entry.state = OpState::Running;
entry.updated_at = chrono::Utc::now().to_rfc3339();
Ok(entry.clone())
}
OpState::Running | OpState::Paused | OpState::Completing | OpState::Terminated => {
Err(OpsError::InvalidTransition)
}
}
}
pub fn pause(&self, op_id: &str) -> Result<OpRecord, OpsError> {
let updated = {
let mut entry = self.ops.get_mut(op_id).ok_or(OpsError::NotFound)?;
match entry.state {
OpState::Running => {
entry.state = OpState::Paused;
entry.updated_at = chrono::Utc::now().to_rfc3339();
entry.clone()
}
OpState::Pending | OpState::Paused | OpState::Completing | OpState::Terminated => {
return Err(OpsError::InvalidTransition);
}
}
};
self.maybe_publish(op_id, OpControlSignal::Pause);
Ok(updated)
}
pub fn resume(&self, op_id: &str) -> Result<OpRecord, OpsError> {
let updated = {
let mut entry = self.ops.get_mut(op_id).ok_or(OpsError::NotFound)?;
match entry.state {
OpState::Paused => {
entry.state = OpState::Running;
entry.updated_at = chrono::Utc::now().to_rfc3339();
entry.clone()
}
OpState::Pending | OpState::Running | OpState::Completing | OpState::Terminated => {
return Err(OpsError::InvalidTransition);
}
}
};
self.maybe_publish(op_id, OpControlSignal::Resume);
Ok(updated)
}
pub fn complete(&self, op_id: &str) -> Result<OpRecord, OpsError> {
let mut entry = self.ops.get_mut(op_id).ok_or(OpsError::NotFound)?;
match entry.state {
OpState::Running => {
entry.state = OpState::Completing;
entry.updated_at = chrono::Utc::now().to_rfc3339();
Ok(entry.clone())
}
OpState::Pending | OpState::Paused | OpState::Completing | OpState::Terminated => {
Err(OpsError::InvalidTransition)
}
}
}
pub fn terminate(&self, op_id: &str) -> Result<OpRecord, OpsError> {
let (updated, was_active) = {
let mut entry = self.ops.get_mut(op_id).ok_or(OpsError::NotFound)?;
match entry.state {
OpState::Pending | OpState::Running | OpState::Paused => {
entry.state = OpState::Terminated;
entry.updated_at = chrono::Utc::now().to_rfc3339();
(entry.clone(), true)
}
OpState::Completing | OpState::Terminated => (entry.clone(), false),
}
};
if was_active {
self.maybe_publish(op_id, OpControlSignal::Terminate);
}
Ok(updated)
}
pub fn sweep(&self, ttl_seconds: i64) -> usize {
let now = chrono::Utc::now();
let mut removed = 0usize;
let keys: Vec<String> = self
.ops
.iter()
.filter(|r| matches!(r.state, OpState::Completing | OpState::Terminated))
.map(|r| r.op_id.clone())
.collect();
for op_id in keys {
let too_old = self
.ops
.get(&op_id)
.and_then(|r| chrono::DateTime::parse_from_rfc3339(&r.updated_at).ok())
.map(|ts| (now - ts.with_timezone(&chrono::Utc)).num_seconds() >= ttl_seconds)
.unwrap_or(false);
if too_old {
self.ops.remove(&op_id);
self.agents.remove(&op_id);
removed += 1;
}
}
removed
}
}
pub fn spawn_sweep_task(registry: std::sync::Arc<OpsRegistry>) -> tokio::task::JoinHandle<()> {
spawn_sweep_task_with(registry, std::time::Duration::from_secs(10), 60)
}
pub fn spawn_sweep_task_with(
registry: std::sync::Arc<OpsRegistry>,
tick: std::time::Duration,
ttl_seconds: i64,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(tick).await;
let removed = registry.sweep(ttl_seconds);
if removed > 0 {
tracing::debug!(swept = removed, "OpsRegistry sweep dropped entries");
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ingest_creates_pending_entry() {
let registry = OpsRegistry::new();
let record = registry.ingest("trace-1:span-1".to_string());
assert_eq!(record.op_id, "trace-1:span-1");
assert_eq!(record.state, OpState::Pending);
assert_eq!(registry.get("trace-1:span-1").unwrap().state, OpState::Pending);
}
#[test]
fn ingest_is_idempotent_and_preserves_later_state() {
let registry = OpsRegistry::new();
let first = registry.ingest("op-1".to_string());
registry.allow("op-1").unwrap();
let second = registry.ingest("op-1".to_string());
assert_eq!(second.state, OpState::Running);
assert_eq!(second.registered_at, first.registered_at);
}
#[test]
fn allow_transitions_pending_to_running() {
let registry = OpsRegistry::new();
registry.ingest("op-1".to_string());
let updated = registry.allow("op-1").unwrap();
assert_eq!(updated.state, OpState::Running);
}
#[test]
fn allow_rejects_non_pending_states() {
let registry = OpsRegistry::new();
registry.register("running-op".to_string());
registry.ingest("paused-op".to_string());
registry.allow("paused-op").unwrap();
registry.pause("paused-op").unwrap();
assert_eq!(registry.allow("running-op").unwrap_err(), OpsError::InvalidTransition);
assert_eq!(registry.allow("paused-op").unwrap_err(), OpsError::InvalidTransition);
}
#[test]
fn allow_unknown_op_returns_not_found() {
let registry = OpsRegistry::new();
assert_eq!(registry.allow("never-ingested").unwrap_err(), OpsError::NotFound);
}
#[test]
fn complete_transitions_running_to_completing() {
let registry = OpsRegistry::new();
registry.register("op-1".to_string());
let updated = registry.complete("op-1").unwrap();
assert_eq!(updated.state, OpState::Completing);
}
#[test]
fn complete_rejects_non_running_states() {
let registry = OpsRegistry::new();
registry.ingest("pending-op".to_string());
registry.register("paused-op".to_string());
registry.pause("paused-op").unwrap();
registry.register("terminated-op".to_string());
registry.terminate("terminated-op").unwrap();
assert_eq!(
registry.complete("pending-op").unwrap_err(),
OpsError::InvalidTransition
);
assert_eq!(registry.complete("paused-op").unwrap_err(), OpsError::InvalidTransition);
assert_eq!(
registry.complete("terminated-op").unwrap_err(),
OpsError::InvalidTransition
);
}
#[test]
fn complete_unknown_op_returns_not_found() {
let registry = OpsRegistry::new();
assert_eq!(registry.complete("never-registered").unwrap_err(), OpsError::NotFound);
}
#[test]
fn terminate_absorbs_pending_into_terminated() {
let registry = OpsRegistry::new();
registry.ingest("op-1".to_string());
let updated = registry.terminate("op-1").unwrap();
assert_eq!(updated.state, OpState::Terminated);
}
#[test]
fn terminate_is_idempotent_on_completing() {
let registry = OpsRegistry::new();
registry.register("op-1".to_string());
registry.complete("op-1").unwrap();
let updated = registry.terminate("op-1").unwrap();
assert_eq!(updated.state, OpState::Completing);
}
fn agent(id: &str) -> AgentId {
AgentId {
org_id: "org".into(),
team_id: "team".into(),
agent_id: id.into(),
}
}
#[tokio::test]
async fn pause_publishes_pause_signal_to_subscribed_agent() {
let publisher = std::sync::Arc::new(OpControlPublisher::new());
let registry = OpsRegistry::new().with_publisher(std::sync::Arc::clone(&publisher));
let mut rx = publisher.subscribe();
registry.ingest_with_agent("op-1".to_string(), agent("a1"));
registry.allow("op-1").unwrap();
registry.pause("op-1").unwrap();
let envelope = rx.recv().await.unwrap();
assert_eq!(envelope.message.op_id, "op-1");
assert_eq!(envelope.message.signal, OpControlSignal::Pause as i32);
assert_eq!(envelope.agent_id.agent_id, "a1");
}
#[tokio::test]
async fn resume_publishes_resume_signal() {
let publisher = std::sync::Arc::new(OpControlPublisher::new());
let registry = OpsRegistry::new().with_publisher(std::sync::Arc::clone(&publisher));
registry.ingest_with_agent("op-2".to_string(), agent("a1"));
registry.allow("op-2").unwrap();
registry.pause("op-2").unwrap();
let mut rx = publisher.subscribe();
registry.resume("op-2").unwrap();
let envelope = rx.recv().await.unwrap();
assert_eq!(envelope.message.signal, OpControlSignal::Resume as i32);
}
#[tokio::test]
async fn terminate_publishes_terminate_signal_only_on_active_states() {
let publisher = std::sync::Arc::new(OpControlPublisher::new());
let registry = OpsRegistry::new().with_publisher(std::sync::Arc::clone(&publisher));
registry.ingest_with_agent("op-3".to_string(), agent("a1"));
registry.allow("op-3").unwrap();
let mut rx = publisher.subscribe();
registry.terminate("op-3").unwrap();
let envelope = rx.recv().await.unwrap();
assert_eq!(envelope.message.signal, OpControlSignal::Terminate as i32);
registry.terminate("op-3").unwrap();
assert!(
tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv())
.await
.is_err(),
"terminate on already-terminated op must not re-publish",
);
}
#[tokio::test]
async fn no_publish_when_op_has_no_agent_id() {
let publisher = std::sync::Arc::new(OpControlPublisher::new());
let registry = OpsRegistry::new().with_publisher(std::sync::Arc::clone(&publisher));
let mut rx = publisher.subscribe();
registry.ingest("op-4".to_string());
registry.allow("op-4").unwrap();
registry.pause("op-4").unwrap();
assert!(
tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv())
.await
.is_err(),
"transitions on agent-less ops must not publish",
);
}
#[test]
fn sweep_removes_terminated_entries_older_than_ttl() {
let registry = OpsRegistry::new();
registry.register("op-old".to_string());
registry.terminate("op-old").unwrap();
let backdated = (chrono::Utc::now() - chrono::Duration::seconds(120)).to_rfc3339();
registry.ops.alter("op-old", |_, mut r| {
r.updated_at = backdated.clone();
r
});
registry.register("op-fresh".to_string());
registry.terminate("op-fresh").unwrap();
let removed = registry.sweep(60);
assert_eq!(removed, 1);
assert!(registry.get("op-old").is_none());
assert!(registry.get("op-fresh").is_some());
}
#[test]
fn sweep_leaves_running_and_paused_alone() {
let registry = OpsRegistry::new();
registry.register("running".to_string());
registry.register("paused".to_string());
registry.pause("paused").unwrap();
let removed = registry.sweep(0);
assert_eq!(removed, 0);
assert!(registry.get("running").is_some());
assert!(registry.get("paused").is_some());
}
}