use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::broadcast;
const DEFAULT_CHANNEL_CAPACITY: usize = 256;
#[derive(Debug)]
struct WorkspaceInner {
root: PathBuf,
metadata: WorkspaceMetadata,
tx: broadcast::Sender<CollaborationEvent>,
event_log: RwLock<Vec<CollaborationEvent>>,
}
#[derive(Debug, Clone)]
pub struct Workspace {
inner: Arc<WorkspaceInner>,
}
impl Workspace {
#[allow(clippy::new_ret_no_self)]
pub fn new(root: impl Into<PathBuf>) -> WorkspaceBuilder {
WorkspaceBuilder {
root: root.into(),
project_name: None,
session_id: None,
created_at: None,
channel_capacity: DEFAULT_CHANNEL_CAPACITY,
}
}
pub fn root(&self) -> &PathBuf {
&self.inner.root
}
pub fn metadata(&self) -> &WorkspaceMetadata {
&self.inner.metadata
}
pub fn publish(&self, event: CollaborationEvent) -> usize {
if let Ok(mut log) = self.inner.event_log.write() {
log.push(event.clone());
}
self.inner.tx.send(event).unwrap_or(0)
}
pub fn subscribe(&self) -> broadcast::Receiver<CollaborationEvent> {
self.inner.tx.subscribe()
}
pub async fn wait_for(
&self,
correlation_id: &str,
timeout: Duration,
) -> Option<CollaborationEvent> {
let mut rx = self.subscribe();
let deadline = tokio::time::sleep(timeout);
tokio::pin!(deadline);
loop {
tokio::select! {
result = rx.recv() => {
match result {
Ok(event) if event.correlation_id == correlation_id => {
return Some(event);
}
Ok(_) => {
continue;
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(
skipped,
"workspace subscriber lagged, {skipped} events dropped"
);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
return None;
}
}
}
() = &mut deadline => {
return None;
}
}
}
}
pub fn events(&self) -> Vec<CollaborationEvent> {
self.inner.event_log.read().map(|log| log.clone()).unwrap_or_default()
}
pub fn request_work(
&self,
correlation_id: impl Into<String>,
topic: impl Into<String>,
producer: impl Into<String>,
) -> CollaborationEvent {
let event = CollaborationEvent::new(
correlation_id,
topic,
producer,
CollaborationEventKind::NeedWork,
);
self.publish(event.clone());
event
}
pub fn claim_work(
&self,
correlation_id: impl Into<String>,
topic: impl Into<String>,
producer: impl Into<String>,
) {
self.publish(CollaborationEvent::new(
correlation_id,
topic,
producer,
CollaborationEventKind::WorkClaimed,
));
}
pub fn publish_work(
&self,
correlation_id: impl Into<String>,
topic: impl Into<String>,
producer: impl Into<String>,
payload: Value,
) {
self.publish(
CollaborationEvent::new(
correlation_id,
topic,
producer,
CollaborationEventKind::WorkPublished,
)
.payload(payload),
);
}
pub fn request_feedback(
&self,
correlation_id: impl Into<String>,
topic: impl Into<String>,
producer: impl Into<String>,
payload: Value,
) {
self.publish(
CollaborationEvent::new(
correlation_id,
topic,
producer,
CollaborationEventKind::FeedbackRequested,
)
.payload(payload),
);
}
pub fn provide_feedback(
&self,
correlation_id: impl Into<String>,
topic: impl Into<String>,
producer: impl Into<String>,
payload: Value,
) {
self.publish(
CollaborationEvent::new(
correlation_id,
topic,
producer,
CollaborationEventKind::FeedbackProvided,
)
.payload(payload),
);
}
pub fn signal_blocked(
&self,
correlation_id: impl Into<String>,
topic: impl Into<String>,
producer: impl Into<String>,
payload: Value,
) {
self.publish(
CollaborationEvent::new(
correlation_id,
topic,
producer,
CollaborationEventKind::Blocked,
)
.payload(payload),
);
}
pub fn signal_completed(
&self,
correlation_id: impl Into<String>,
topic: impl Into<String>,
producer: impl Into<String>,
) {
self.publish(CollaborationEvent::new(
correlation_id,
topic,
producer,
CollaborationEventKind::Completed,
));
}
pub async fn wait_for_work(
&self,
correlation_id: &str,
timeout: Duration,
) -> Option<CollaborationEvent> {
self.wait_for_kind(correlation_id, CollaborationEventKind::WorkPublished, timeout).await
}
pub async fn wait_for_feedback(
&self,
correlation_id: &str,
timeout: Duration,
) -> Option<CollaborationEvent> {
self.wait_for_kind(correlation_id, CollaborationEventKind::FeedbackProvided, timeout).await
}
pub async fn wait_for_kind(
&self,
correlation_id: &str,
kind: CollaborationEventKind,
timeout: Duration,
) -> Option<CollaborationEvent> {
let mut rx = self.subscribe();
let deadline = tokio::time::sleep(timeout);
tokio::pin!(deadline);
loop {
tokio::select! {
result = rx.recv() => {
match result {
Ok(event)
if event.correlation_id == correlation_id
&& event.kind == kind =>
{
return Some(event);
}
Ok(_) => continue,
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(
skipped,
"workspace subscriber lagged, {skipped} events dropped"
);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
return None;
}
}
}
() = &mut deadline => {
return None;
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct WorkspaceBuilder {
root: PathBuf,
project_name: Option<String>,
session_id: Option<String>,
created_at: Option<u64>,
channel_capacity: usize,
}
impl WorkspaceBuilder {
pub fn project_name(mut self, name: impl Into<String>) -> Self {
self.project_name = Some(name.into());
self
}
pub fn session_id(mut self, id: impl Into<String>) -> Self {
self.session_id = Some(id.into());
self
}
pub fn created_at(mut self, timestamp: u64) -> Self {
self.created_at = Some(timestamp);
self
}
pub fn channel_capacity(mut self, capacity: usize) -> Self {
self.channel_capacity = capacity;
self
}
pub fn build(self) -> Workspace {
let project_name = self.project_name.unwrap_or_else(|| {
self.root.file_name().and_then(|n| n.to_str()).unwrap_or("unnamed").to_string()
});
let (tx, _rx) = broadcast::channel(self.channel_capacity);
Workspace {
inner: Arc::new(WorkspaceInner {
root: self.root,
metadata: WorkspaceMetadata {
project_name,
session_id: self.session_id,
created_at: self.created_at,
},
tx,
event_log: RwLock::new(Vec::new()),
}),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WorkspaceMetadata {
pub project_name: String,
pub session_id: Option<String>,
pub created_at: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum CollaborationEventKind {
NeedWork,
WorkClaimed,
WorkPublished,
FeedbackRequested,
FeedbackProvided,
Blocked,
Completed,
}
impl std::fmt::Display for CollaborationEventKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NeedWork => write!(f, "NeedWork"),
Self::WorkClaimed => write!(f, "WorkClaimed"),
Self::WorkPublished => write!(f, "WorkPublished"),
Self::FeedbackRequested => write!(f, "FeedbackRequested"),
Self::FeedbackProvided => write!(f, "FeedbackProvided"),
Self::Blocked => write!(f, "Blocked"),
Self::Completed => write!(f, "Completed"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CollaborationEvent {
pub correlation_id: String,
pub topic: String,
pub producer: String,
pub consumer: Option<String>,
pub kind: CollaborationEventKind,
pub payload: Value,
pub timestamp: u64,
}
impl CollaborationEvent {
pub fn new(
correlation_id: impl Into<String>,
topic: impl Into<String>,
producer: impl Into<String>,
kind: CollaborationEventKind,
) -> Self {
Self {
correlation_id: correlation_id.into(),
topic: topic.into(),
producer: producer.into(),
consumer: None,
kind,
payload: Value::Null,
timestamp: 0,
}
}
pub fn consumer(mut self, consumer: impl Into<String>) -> Self {
self.consumer = Some(consumer.into());
self
}
pub fn payload(mut self, payload: Value) -> Self {
self.payload = payload;
self
}
pub fn timestamp(mut self, ts: u64) -> Self {
self.timestamp = ts;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn workspace_builder_defaults_project_name_from_root() {
let ws = Workspace::new("/tmp/my-project").build();
assert_eq!(ws.root(), &PathBuf::from("/tmp/my-project"));
assert_eq!(ws.metadata().project_name, "my-project");
assert_eq!(ws.metadata().session_id, None);
assert_eq!(ws.metadata().created_at, None);
}
#[test]
fn workspace_builder_with_all_fields() {
let ws = Workspace::new("./demo")
.project_name("demo-site")
.session_id("sess-abc")
.created_at(1719000000)
.build();
assert_eq!(ws.root(), &PathBuf::from("./demo"));
assert_eq!(ws.metadata().project_name, "demo-site");
assert_eq!(ws.metadata().session_id.as_deref(), Some("sess-abc"));
assert_eq!(ws.metadata().created_at, Some(1719000000));
}
#[test]
fn workspace_builder_unnamed_fallback() {
let ws = Workspace::new("/").build();
assert_eq!(ws.metadata().project_name, "unnamed");
}
#[test]
fn workspace_clone_shares_transport() {
let ws1 = Workspace::new("./proj").build();
let ws2 = ws1.clone();
let mut rx = ws2.subscribe();
ws1.publish(CollaborationEvent::new(
"c1",
"topic",
"producer",
CollaborationEventKind::WorkPublished,
));
let event = rx.try_recv().expect("should receive event from clone");
assert_eq!(event.correlation_id, "c1");
}
#[test]
fn publish_with_no_subscribers_returns_zero() {
let ws = Workspace::new("./proj").build();
let count = ws.publish(CollaborationEvent::new(
"c1",
"topic",
"producer",
CollaborationEventKind::NeedWork,
));
assert_eq!(count, 0);
}
#[test]
fn publish_with_subscriber_returns_count() {
let ws = Workspace::new("./proj").build();
let _rx1 = ws.subscribe();
let _rx2 = ws.subscribe();
let count = ws.publish(CollaborationEvent::new(
"c1",
"topic",
"producer",
CollaborationEventKind::NeedWork,
));
assert_eq!(count, 2);
}
#[test]
fn subscribe_receives_published_events() {
let ws = Workspace::new("./proj").build();
let mut rx = ws.subscribe();
ws.publish(CollaborationEvent::new(
"c1",
"api",
"backend",
CollaborationEventKind::WorkPublished,
));
ws.publish(CollaborationEvent::new(
"c2",
"schema",
"db",
CollaborationEventKind::Completed,
));
let e1 = rx.try_recv().unwrap();
assert_eq!(e1.correlation_id, "c1");
assert_eq!(e1.kind, CollaborationEventKind::WorkPublished);
let e2 = rx.try_recv().unwrap();
assert_eq!(e2.correlation_id, "c2");
assert_eq!(e2.kind, CollaborationEventKind::Completed);
}
#[tokio::test]
async fn wait_for_returns_matching_event() {
let ws = Workspace::new("./proj").build();
let ws_clone = ws.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
ws_clone.publish(CollaborationEvent::new(
"other",
"unrelated",
"someone",
CollaborationEventKind::NeedWork,
));
ws_clone.publish(
CollaborationEvent::new(
"target",
"api",
"backend",
CollaborationEventKind::WorkPublished,
)
.payload(serde_json::json!({ "done": true })),
);
});
let result = ws.wait_for("target", Duration::from_secs(1)).await;
let event = result.expect("should receive matching event");
assert_eq!(event.correlation_id, "target");
assert_eq!(event.kind, CollaborationEventKind::WorkPublished);
assert_eq!(event.payload, serde_json::json!({ "done": true }));
}
#[tokio::test]
async fn wait_for_times_out_when_no_match() {
let ws = Workspace::new("./proj").build();
let result = ws.wait_for("nonexistent", Duration::from_millis(50)).await;
assert!(result.is_none());
}
#[tokio::test]
async fn wait_for_ignores_non_matching_events() {
let ws = Workspace::new("./proj").build();
let ws_clone = ws.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(5)).await;
for i in 0..5 {
ws_clone.publish(CollaborationEvent::new(
format!("wrong-{i}"),
"topic",
"producer",
CollaborationEventKind::NeedWork,
));
}
ws_clone.publish(CollaborationEvent::new(
"right",
"topic",
"producer",
CollaborationEventKind::WorkPublished,
));
});
let result = ws.wait_for("right", Duration::from_secs(1)).await;
assert!(result.is_some());
assert_eq!(result.unwrap().correlation_id, "right");
}
#[test]
fn events_returns_buffered_events() {
let ws = Workspace::new("./proj").channel_capacity(16).build();
ws.publish(CollaborationEvent::new("c1", "t1", "p1", CollaborationEventKind::NeedWork));
ws.publish(CollaborationEvent::new("c2", "t2", "p2", CollaborationEventKind::Completed));
let events = ws.events();
assert_eq!(events.len(), 2);
assert_eq!(events[0].correlation_id, "c1");
assert_eq!(events[1].correlation_id, "c2");
}
#[test]
fn collaboration_event_kind_display() {
assert_eq!(CollaborationEventKind::NeedWork.to_string(), "NeedWork");
assert_eq!(CollaborationEventKind::WorkClaimed.to_string(), "WorkClaimed");
assert_eq!(CollaborationEventKind::WorkPublished.to_string(), "WorkPublished");
assert_eq!(CollaborationEventKind::FeedbackRequested.to_string(), "FeedbackRequested");
assert_eq!(CollaborationEventKind::FeedbackProvided.to_string(), "FeedbackProvided");
assert_eq!(CollaborationEventKind::Blocked.to_string(), "Blocked");
assert_eq!(CollaborationEventKind::Completed.to_string(), "Completed");
}
#[test]
fn collaboration_event_new_defaults() {
let event = CollaborationEvent::new(
"corr-1",
"backend-api",
"coordinator",
CollaborationEventKind::NeedWork,
);
assert_eq!(event.correlation_id, "corr-1");
assert_eq!(event.topic, "backend-api");
assert_eq!(event.producer, "coordinator");
assert_eq!(event.consumer, None);
assert_eq!(event.kind, CollaborationEventKind::NeedWork);
assert_eq!(event.payload, Value::Null);
assert_eq!(event.timestamp, 0);
}
#[test]
fn collaboration_event_builder_methods() {
let event = CollaborationEvent::new(
"corr-2",
"api-routes",
"backend_engineer",
CollaborationEventKind::WorkPublished,
)
.consumer("frontend_engineer")
.payload(serde_json::json!({ "routes": ["/api/users"] }))
.timestamp(1719000000000);
assert_eq!(event.consumer.as_deref(), Some("frontend_engineer"));
assert_eq!(event.payload, serde_json::json!({ "routes": ["/api/users"] }));
assert_eq!(event.timestamp, 1719000000000);
}
#[test]
fn collaboration_event_kind_equality_and_hash() {
use std::collections::HashSet;
let mut set = HashSet::new();
set.insert(CollaborationEventKind::NeedWork);
set.insert(CollaborationEventKind::NeedWork);
set.insert(CollaborationEventKind::Completed);
assert_eq!(set.len(), 2);
}
#[test]
fn collaboration_event_kind_copy() {
let kind = CollaborationEventKind::Blocked;
let copy = kind;
assert_eq!(kind, copy);
}
#[test]
fn workspace_metadata_serialization_roundtrip() {
let meta = WorkspaceMetadata {
project_name: "test-proj".to_string(),
session_id: Some("sess-1".to_string()),
created_at: Some(1719000000),
};
let json = serde_json::to_string(&meta).unwrap();
let deserialized: WorkspaceMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(meta, deserialized);
}
#[test]
fn collaboration_event_serialization_roundtrip() {
let event = CollaborationEvent::new(
"corr-rt",
"schema",
"db_engineer",
CollaborationEventKind::FeedbackRequested,
)
.consumer("reviewer")
.payload(serde_json::json!({ "tables": ["users"] }))
.timestamp(1719000000000);
let json = serde_json::to_string(&event).unwrap();
let deserialized: CollaborationEvent = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.correlation_id, "corr-rt");
assert_eq!(deserialized.kind, CollaborationEventKind::FeedbackRequested);
assert_eq!(deserialized.consumer.as_deref(), Some("reviewer"));
}
#[test]
fn request_work_publishes_need_work_event() {
let ws = Workspace::new("./proj").build();
let event = ws.request_work("corr-rw", "api-routes", "frontend");
assert_eq!(event.correlation_id, "corr-rw");
assert_eq!(event.topic, "api-routes");
assert_eq!(event.producer, "frontend");
assert_eq!(event.kind, CollaborationEventKind::NeedWork);
let events = ws.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, CollaborationEventKind::NeedWork);
}
#[test]
fn claim_work_publishes_work_claimed_event() {
let ws = Workspace::new("./proj").build();
ws.claim_work("corr-cw", "api-routes", "backend");
let events = ws.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].correlation_id, "corr-cw");
assert_eq!(events[0].kind, CollaborationEventKind::WorkClaimed);
}
#[test]
fn publish_work_publishes_work_published_with_payload() {
let ws = Workspace::new("./proj").build();
ws.publish_work(
"corr-pw",
"api-routes",
"backend",
serde_json::json!({ "routes": ["/users"] }),
);
let events = ws.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, CollaborationEventKind::WorkPublished);
assert_eq!(events[0].payload, serde_json::json!({ "routes": ["/users"] }));
}
#[test]
fn request_feedback_publishes_feedback_requested_with_payload() {
let ws = Workspace::new("./proj").build();
ws.request_feedback(
"corr-rf",
"api-contract",
"backend",
serde_json::json!({ "schema": "v1" }),
);
let events = ws.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, CollaborationEventKind::FeedbackRequested);
assert_eq!(events[0].payload, serde_json::json!({ "schema": "v1" }));
}
#[test]
fn provide_feedback_publishes_feedback_provided_with_payload() {
let ws = Workspace::new("./proj").build();
ws.provide_feedback(
"corr-pf",
"api-contract",
"reviewer",
serde_json::json!({ "approved": true }),
);
let events = ws.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, CollaborationEventKind::FeedbackProvided);
assert_eq!(events[0].payload, serde_json::json!({ "approved": true }));
}
#[test]
fn signal_blocked_publishes_blocked_with_payload() {
let ws = Workspace::new("./proj").build();
ws.signal_blocked(
"corr-sb",
"database-schema",
"backend",
serde_json::json!({ "needs": "approval" }),
);
let events = ws.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, CollaborationEventKind::Blocked);
assert_eq!(events[0].payload, serde_json::json!({ "needs": "approval" }));
}
#[test]
fn signal_completed_publishes_completed_event() {
let ws = Workspace::new("./proj").build();
ws.signal_completed("corr-sc", "api-routes", "backend");
let events = ws.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].correlation_id, "corr-sc");
assert_eq!(events[0].kind, CollaborationEventKind::Completed);
}
#[tokio::test]
async fn wait_for_work_returns_work_published_event() {
let ws = Workspace::new("./proj").build();
let ws_clone = ws.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
ws_clone.claim_work("corr-wfw", "api", "backend");
ws_clone.publish_work(
"corr-wfw",
"api",
"backend",
serde_json::json!({ "done": true }),
);
});
let result = ws.wait_for_work("corr-wfw", Duration::from_secs(1)).await;
let event = result.expect("should receive WorkPublished event");
assert_eq!(event.kind, CollaborationEventKind::WorkPublished);
assert_eq!(event.payload, serde_json::json!({ "done": true }));
}
#[tokio::test]
async fn wait_for_feedback_returns_feedback_provided_event() {
let ws = Workspace::new("./proj").build();
let ws_clone = ws.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
ws_clone.request_feedback(
"corr-wff",
"contract",
"backend",
serde_json::json!({ "schema": "v1" }),
);
ws_clone.provide_feedback(
"corr-wff",
"contract",
"reviewer",
serde_json::json!({ "approved": true }),
);
});
let result = ws.wait_for_feedback("corr-wff", Duration::from_secs(1)).await;
let event = result.expect("should receive FeedbackProvided event");
assert_eq!(event.kind, CollaborationEventKind::FeedbackProvided);
assert_eq!(event.payload, serde_json::json!({ "approved": true }));
}
#[tokio::test]
async fn wait_for_kind_filters_by_both_correlation_and_kind() {
let ws = Workspace::new("./proj").build();
let ws_clone = ws.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
ws_clone.request_work("corr-wfk", "topic", "agent-a");
ws_clone.claim_work("other-corr", "topic", "agent-b");
ws_clone.claim_work("corr-wfk", "topic", "agent-b");
});
let result = ws
.wait_for_kind("corr-wfk", CollaborationEventKind::WorkClaimed, Duration::from_secs(1))
.await;
let event = result.expect("should receive matching event");
assert_eq!(event.correlation_id, "corr-wfk");
assert_eq!(event.kind, CollaborationEventKind::WorkClaimed);
assert_eq!(event.producer, "agent-b");
}
#[tokio::test]
async fn wait_for_kind_times_out_when_kind_does_not_match() {
let ws = Workspace::new("./proj").build();
let ws_clone = ws.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(5)).await;
ws_clone.request_work("corr-to", "topic", "agent");
});
let result = ws
.wait_for_kind("corr-to", CollaborationEventKind::Completed, Duration::from_millis(100))
.await;
assert!(result.is_none());
}
#[test]
fn full_collaboration_flow_via_helpers() {
let ws = Workspace::new("./proj").build();
ws.request_work("flow-1", "backend-api", "coordinator");
ws.claim_work("flow-1", "backend-api", "backend_engineer");
ws.publish_work(
"flow-1",
"backend-api",
"backend_engineer",
serde_json::json!({ "endpoints": 3 }),
);
ws.request_feedback(
"flow-1",
"backend-api",
"backend_engineer",
serde_json::json!({ "review": "please" }),
);
ws.provide_feedback(
"flow-1",
"backend-api",
"reviewer",
serde_json::json!({ "approved": true }),
);
ws.signal_completed("flow-1", "backend-api", "backend_engineer");
let events = ws.events();
assert_eq!(events.len(), 6);
assert_eq!(events[0].kind, CollaborationEventKind::NeedWork);
assert_eq!(events[1].kind, CollaborationEventKind::WorkClaimed);
assert_eq!(events[2].kind, CollaborationEventKind::WorkPublished);
assert_eq!(events[3].kind, CollaborationEventKind::FeedbackRequested);
assert_eq!(events[4].kind, CollaborationEventKind::FeedbackProvided);
assert_eq!(events[5].kind, CollaborationEventKind::Completed);
}
}