use crate::{
application::services::projection::Projection, domain::entities::Event, error::Result,
};
use dashmap::DashMap;
use serde_json::{Value, json};
pub struct WorkflowStatusProjection {
states: DashMap<String, Value>,
}
impl Default for WorkflowStatusProjection {
fn default() -> Self {
Self::new()
}
}
impl WorkflowStatusProjection {
pub fn new() -> Self {
Self {
states: DashMap::new(),
}
}
fn ensure_entry(&self, entity_id: &str) -> dashmap::mapref::one::RefMut<'_, String, Value> {
self.states.entry(entity_id.to_string()).or_insert_with(|| {
json!({
"status": "unknown",
"steps_total": 0,
"steps_completed": 0,
"awaiting_approval": false,
})
})
}
}
impl Projection for WorkflowStatusProjection {
fn name(&self) -> &'static str {
"workflow_status"
}
fn process(&self, event: &Event) -> Result<()> {
let entity_id = event.entity_id_str().to_string();
let event_type = event.event_type_str();
let payload = &event.payload;
match event_type {
"workflow.dispatched" => {
let steps_total = payload
.get("steps_total")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);
self.states.insert(
entity_id,
json!({
"status": "pending",
"steps_total": steps_total,
"steps_completed": 0,
"awaiting_approval": false,
}),
);
}
"workflow.claimed" => {
let mut state = self.ensure_entry(&entity_id);
let status = state.get("status").and_then(|s| s.as_str()).unwrap_or("");
#[allow(clippy::collapsible_if)]
if status == "pending" || status == "unknown" {
if let Some(rid) = payload.get("replicant_id") {
state["status"] = json!("claimed");
state["replicant_id"] = rid.clone();
}
}
}
"workflow.step.completed" => {
let mut state = self.ensure_entry(&entity_id);
let status = state.get("status").and_then(|s| s.as_str()).unwrap_or("");
if status == "claimed" || status == "running" || status == "unknown" {
let completed = state
.get("steps_completed")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0)
+ 1;
state["status"] = json!("running");
state["steps_completed"] = json!(completed);
}
}
"workflow.step.failed" => {
let mut state = self.ensure_entry(&entity_id);
state["status"] = json!("failed");
if let Some(err) = payload.get("error") {
state["error"] = err.clone();
}
}
"workflow.output.ready" => {
let mut state = self.ensure_entry(&entity_id);
state["status"] = json!("completed");
if let Some(result) = payload.get("result") {
state["output"] = result.clone();
}
}
"workflow.approval.requested" => {
let mut state = self.ensure_entry(&entity_id);
state["status"] = json!("awaiting_approval");
state["awaiting_approval"] = json!(true);
}
"workflow.approval.granted" => {
let mut state = self.ensure_entry(&entity_id);
state["status"] = json!("running");
state["awaiting_approval"] = json!(false);
}
"workflow.approval.rejected" => {
let mut state = self.ensure_entry(&entity_id);
state["status"] = json!("rejected");
state["awaiting_approval"] = json!(false);
}
_ => {}
}
Ok(())
}
fn get_state(&self, entity_id: &str) -> Option<Value> {
self.states.get(entity_id).map(|v| v.clone())
}
fn clear(&self) {
self.states.clear();
}
}
pub struct ReplicantRegistryProjection {
states: DashMap<String, Value>,
}
impl Default for ReplicantRegistryProjection {
fn default() -> Self {
Self::new()
}
}
impl ReplicantRegistryProjection {
pub fn new() -> Self {
Self {
states: DashMap::new(),
}
}
}
impl Projection for ReplicantRegistryProjection {
fn name(&self) -> &'static str {
"replicant_registry"
}
fn process(&self, event: &Event) -> Result<()> {
let entity_id = event.entity_id_str().to_string();
let event_type = event.event_type_str();
let payload = &event.payload;
match event_type {
"replicant.registered" => {
let capabilities = payload.get("capabilities").cloned().unwrap_or(json!([]));
self.states.insert(
entity_id,
json!({
"status": "active",
"capabilities": capabilities,
}),
);
}
"replicant.heartbeat" => {
let mut state = self
.states
.entry(entity_id)
.or_insert_with(|| json!({"status": "active", "capabilities": []}));
state["status"] = json!("active");
state["last_heartbeat"] = json!(event.timestamp().to_rfc3339());
}
"replicant.stale" => {
let mut state = self
.states
.entry(entity_id)
.or_insert_with(|| json!({"status": "stale", "capabilities": []}));
state["status"] = json!("stale");
}
_ => {}
}
Ok(())
}
fn get_state(&self, entity_id: &str) -> Option<Value> {
self.states.get(entity_id).map(|v| v.clone())
}
fn clear(&self) {
self.states.clear();
}
}
pub struct TaskQueueProjection {
pending: DashMap<String, ()>,
}
impl Default for TaskQueueProjection {
fn default() -> Self {
Self::new()
}
}
impl TaskQueueProjection {
pub fn new() -> Self {
Self {
pending: DashMap::new(),
}
}
}
impl Projection for TaskQueueProjection {
fn name(&self) -> &'static str {
"task_queue"
}
fn process(&self, event: &Event) -> Result<()> {
let entity_id = event.entity_id_str().to_string();
let event_type = event.event_type_str();
match event_type {
"workflow.dispatched" => {
self.pending.insert(entity_id, ());
}
"workflow.claimed" | "workflow.output.ready" | "workflow.step.failed" => {
self.pending.remove(&entity_id);
}
_ => {}
}
Ok(())
}
fn get_state(&self, entity_id: &str) -> Option<Value> {
if entity_id == "__all" {
let pending: Vec<Value> = self
.pending
.iter()
.map(|entry| json!(entry.key().clone()))
.collect();
Some(json!({ "pending": pending }))
} else if self.pending.contains_key(entity_id) {
Some(json!({ "status": "pending" }))
} else {
None
}
}
fn clear(&self) {
self.pending.clear();
}
}