adk_code/workspace.rs
1//! Workspace and collaboration types for collaborative project builds.
2//!
3//! This module provides the [`Workspace`] abstraction for multi-agent code generation
4//! and project-building flows, along with typed [`CollaborationEvent`]s that support
5//! ownership, correlation, and wait/resume semantics.
6//!
7//! ## Overview
8//!
9//! A [`Workspace`] represents a shared project context where specialist agents
10//! coordinate through typed collaboration events rather than raw pub/sub.
11//! The collaboration model preserves ownership, correlation, and completion
12//! semantics so agents can request dependencies and resume only when matching
13//! work is published.
14//!
15//! ## Quick Start
16//!
17//! ```rust
18//! use adk_code::Workspace;
19//!
20//! let workspace = Workspace::new("./demo-site")
21//! .project_name("demo-site")
22//! .session_id("session-123")
23//! .build();
24//! assert_eq!(workspace.metadata().project_name, "demo-site");
25//! ```
26//!
27//! ## Collaboration Events
28//!
29//! ```rust
30//! use adk_code::{CollaborationEvent, CollaborationEventKind};
31//!
32//! let event = CollaborationEvent::new(
33//! "corr-001",
34//! "backend-api",
35//! "backend_engineer",
36//! CollaborationEventKind::WorkPublished,
37//! );
38//! assert_eq!(event.kind, CollaborationEventKind::WorkPublished);
39//! ```
40//!
41//! ## Publish, Subscribe, and Wait/Resume
42//!
43//! ```rust,no_run
44//! # async fn example() {
45//! use adk_code::{CollaborationEvent, CollaborationEventKind, Workspace};
46//! use std::time::Duration;
47//!
48//! let workspace = Workspace::new("./project").build();
49//!
50//! // Subscribe to all events
51//! let mut rx = workspace.subscribe();
52//!
53//! // Publish an event
54//! workspace.publish(CollaborationEvent::new(
55//! "corr-1", "api", "backend", CollaborationEventKind::WorkPublished,
56//! ));
57//!
58//! // Wait for a correlated response
59//! let result = workspace.wait_for("corr-1", Duration::from_secs(5)).await;
60//! # }
61//! ```
62
63use serde::{Deserialize, Serialize};
64use serde_json::Value;
65use std::path::PathBuf;
66use std::sync::{Arc, RwLock};
67use std::time::Duration;
68use tokio::sync::broadcast;
69
70/// Default capacity for the internal broadcast channel.
71const DEFAULT_CHANNEL_CAPACITY: usize = 256;
72
73/// Internal shared state behind [`Workspace`].
74///
75/// This struct is wrapped in `Arc` so that `Workspace` can be cheaply cloned
76/// and shared across agents and async tasks. The broadcast channel provides
77/// the in-process collaboration transport.
78#[derive(Debug)]
79struct WorkspaceInner {
80 /// The shared project root directory.
81 root: PathBuf,
82 /// Project and session metadata.
83 metadata: WorkspaceMetadata,
84 /// Sender side of the broadcast channel for collaboration events.
85 tx: broadcast::Sender<CollaborationEvent>,
86 /// Append-only event log so [`Workspace::events`] can return history
87 /// regardless of when subscribers were created.
88 event_log: RwLock<Vec<CollaborationEvent>>,
89}
90
91/// A shared project context for collaborative code generation and execution.
92///
93/// `Workspace` is the public anchor for multi-agent project-building flows.
94/// It represents a shared project root, metadata, and collaboration state.
95/// Specialist agents attached to the same workspace can publish and consume
96/// typed [`CollaborationEvent`]s without configuring raw pub/sub directly.
97///
98/// Internally, `Workspace` uses `Arc<WorkspaceInner>` so it can be cheaply
99/// cloned and shared across agents and async boundaries. The collaboration
100/// transport is an in-process broadcast channel — transport details are hidden
101/// from the public API.
102///
103/// Use [`Workspace::new`] to get a [`WorkspaceBuilder`] for ergonomic construction.
104///
105/// # Example
106///
107/// ```rust
108/// use adk_code::Workspace;
109///
110/// let workspace = Workspace::new("./my-project")
111/// .project_name("my-project")
112/// .session_id("sess-abc")
113/// .build();
114///
115/// assert_eq!(workspace.root(), &std::path::PathBuf::from("./my-project"));
116/// assert_eq!(workspace.metadata().project_name, "my-project");
117/// assert_eq!(workspace.metadata().session_id.as_deref(), Some("sess-abc"));
118/// ```
119#[derive(Debug, Clone)]
120pub struct Workspace {
121 inner: Arc<WorkspaceInner>,
122}
123
124impl Workspace {
125 /// Start building a new workspace rooted at the given path.
126 ///
127 /// Returns a [`WorkspaceBuilder`] for fluent configuration.
128 ///
129 /// # Example
130 ///
131 /// ```rust
132 /// use adk_code::Workspace;
133 ///
134 /// let ws = Workspace::new("/tmp/project").build();
135 /// assert_eq!(ws.root(), &std::path::PathBuf::from("/tmp/project"));
136 /// ```
137 // Intentional: `new` returns a builder per the design doc's fluent API
138 // (`Workspace::new("./path").project_name("demo").build()`).
139 #[allow(clippy::new_ret_no_self)]
140 pub fn new(root: impl Into<PathBuf>) -> WorkspaceBuilder {
141 WorkspaceBuilder {
142 root: root.into(),
143 project_name: None,
144 session_id: None,
145 created_at: None,
146 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
147 }
148 }
149
150 /// The shared project root directory.
151 pub fn root(&self) -> &PathBuf {
152 &self.inner.root
153 }
154
155 /// Project and session metadata.
156 pub fn metadata(&self) -> &WorkspaceMetadata {
157 &self.inner.metadata
158 }
159
160 /// Publish a collaboration event to all subscribers.
161 ///
162 /// This is a non-blocking operation. If there are no active subscribers,
163 /// the event is silently dropped. Returns the number of receivers that
164 /// received the event.
165 ///
166 /// # Example
167 ///
168 /// ```rust
169 /// use adk_code::{CollaborationEvent, CollaborationEventKind, Workspace};
170 ///
171 /// let ws = Workspace::new("./proj").build();
172 /// let mut rx = ws.subscribe();
173 ///
174 /// ws.publish(CollaborationEvent::new(
175 /// "c1", "api", "backend", CollaborationEventKind::WorkPublished,
176 /// ));
177 /// ```
178 pub fn publish(&self, event: CollaborationEvent) -> usize {
179 // Append to the persistent event log before broadcasting.
180 if let Ok(mut log) = self.inner.event_log.write() {
181 log.push(event.clone());
182 }
183 // If no receivers are listening, send returns Err — that is fine.
184 self.inner.tx.send(event).unwrap_or(0)
185 }
186
187 /// Subscribe to collaboration events on this workspace.
188 ///
189 /// Returns a [`broadcast::Receiver`] that yields every event published
190 /// after the subscription is created. Each subscriber gets its own
191 /// independent stream of events.
192 ///
193 /// # Example
194 ///
195 /// ```rust
196 /// use adk_code::{CollaborationEvent, CollaborationEventKind, Workspace};
197 ///
198 /// let ws = Workspace::new("./proj").build();
199 /// let mut rx = ws.subscribe();
200 ///
201 /// ws.publish(CollaborationEvent::new(
202 /// "c1", "topic", "producer", CollaborationEventKind::NeedWork,
203 /// ));
204 /// ```
205 pub fn subscribe(&self) -> broadcast::Receiver<CollaborationEvent> {
206 self.inner.tx.subscribe()
207 }
208
209 /// Wait for a collaboration event matching the given `correlation_id`.
210 ///
211 /// Subscribes to the workspace event stream and returns the first event
212 /// whose `correlation_id` matches. If no matching event arrives within
213 /// `timeout`, returns `None`.
214 ///
215 /// This implements the wait/resume pattern: an agent can publish a
216 /// [`CollaborationEventKind::NeedWork`] event and then call `wait_for`
217 /// to suspend until the matching [`CollaborationEventKind::WorkPublished`]
218 /// (or other correlated response) arrives.
219 ///
220 /// # Example
221 ///
222 /// ```rust,no_run
223 /// # async fn example() {
224 /// use adk_code::{CollaborationEvent, CollaborationEventKind, Workspace};
225 /// use std::time::Duration;
226 ///
227 /// let ws = Workspace::new("./proj").build();
228 /// // In practice another agent would publish the matching event.
229 /// let result = ws.wait_for("corr-42", Duration::from_millis(100)).await;
230 /// assert!(result.is_none()); // timed out — no publisher
231 /// # }
232 /// ```
233 pub async fn wait_for(
234 &self,
235 correlation_id: &str,
236 timeout: Duration,
237 ) -> Option<CollaborationEvent> {
238 let mut rx = self.subscribe();
239 let deadline = tokio::time::sleep(timeout);
240 tokio::pin!(deadline);
241
242 loop {
243 tokio::select! {
244 result = rx.recv() => {
245 match result {
246 Ok(event) if event.correlation_id == correlation_id => {
247 return Some(event);
248 }
249 Ok(_) => {
250 // Not our correlation — keep waiting.
251 continue;
252 }
253 Err(broadcast::error::RecvError::Lagged(skipped)) => {
254 tracing::warn!(
255 skipped,
256 "workspace subscriber lagged, {skipped} events dropped"
257 );
258 continue;
259 }
260 Err(broadcast::error::RecvError::Closed) => {
261 return None;
262 }
263 }
264 }
265 () = &mut deadline => {
266 return None;
267 }
268 }
269 }
270 }
271
272 /// Get a snapshot of all events published to this workspace.
273 ///
274 /// Returns a clone of the internal event log. Unlike the broadcast
275 /// channel (which has a fixed capacity and drops old events for slow
276 /// subscribers), the event log retains every event published since
277 /// workspace creation.
278 ///
279 /// # Example
280 ///
281 /// ```rust
282 /// use adk_code::{CollaborationEvent, CollaborationEventKind, Workspace};
283 ///
284 /// let ws = Workspace::new("./proj").build();
285 /// ws.publish(CollaborationEvent::new(
286 /// "c1", "topic", "producer", CollaborationEventKind::Completed,
287 /// ));
288 /// let events = ws.events();
289 /// // events may contain the published event if still in the buffer
290 /// ```
291 pub fn events(&self) -> Vec<CollaborationEvent> {
292 self.inner.event_log.read().map(|log| log.clone()).unwrap_or_default()
293 }
294
295 // ── Agent-facing workspace integration helpers ──────────────────────
296 //
297 // These thin wrappers make the common collaborative patterns easy
298 // without exposing event construction details. Each method constructs
299 // the appropriate `CollaborationEvent` and publishes it.
300
301 /// Request work from another specialist or coordinator.
302 ///
303 /// Publishes a [`CollaborationEventKind::NeedWork`] event and returns
304 /// the event that was published. The caller can then use
305 /// [`Workspace::wait_for_work`] to suspend until the matching
306 /// [`CollaborationEventKind::WorkPublished`] event arrives.
307 ///
308 /// # Example
309 ///
310 /// ```rust
311 /// use adk_code::Workspace;
312 ///
313 /// let ws = Workspace::new("./proj").build();
314 /// let event = ws.request_work("corr-1", "api-routes", "frontend_engineer");
315 /// assert_eq!(event.kind, adk_code::CollaborationEventKind::NeedWork);
316 /// ```
317 pub fn request_work(
318 &self,
319 correlation_id: impl Into<String>,
320 topic: impl Into<String>,
321 producer: impl Into<String>,
322 ) -> CollaborationEvent {
323 let event = CollaborationEvent::new(
324 correlation_id,
325 topic,
326 producer,
327 CollaborationEventKind::NeedWork,
328 );
329 self.publish(event.clone());
330 event
331 }
332
333 /// Claim ownership of a requested work item.
334 ///
335 /// Publishes a [`CollaborationEventKind::WorkClaimed`] event to signal
336 /// that this agent is taking responsibility for the work.
337 ///
338 /// # Example
339 ///
340 /// ```rust
341 /// use adk_code::Workspace;
342 ///
343 /// let ws = Workspace::new("./proj").build();
344 /// ws.claim_work("corr-1", "api-routes", "backend_engineer");
345 /// ```
346 pub fn claim_work(
347 &self,
348 correlation_id: impl Into<String>,
349 topic: impl Into<String>,
350 producer: impl Into<String>,
351 ) {
352 self.publish(CollaborationEvent::new(
353 correlation_id,
354 topic,
355 producer,
356 CollaborationEventKind::WorkClaimed,
357 ));
358 }
359
360 /// Publish completed work to the workspace.
361 ///
362 /// Publishes a [`CollaborationEventKind::WorkPublished`] event with the
363 /// given payload. Agents waiting via [`Workspace::wait_for_work`] on the
364 /// same `correlation_id` will be resumed.
365 ///
366 /// # Example
367 ///
368 /// ```rust
369 /// use adk_code::Workspace;
370 ///
371 /// let ws = Workspace::new("./proj").build();
372 /// ws.publish_work(
373 /// "corr-1",
374 /// "api-routes",
375 /// "backend_engineer",
376 /// serde_json::json!({ "routes": ["/api/users"] }),
377 /// );
378 /// ```
379 pub fn publish_work(
380 &self,
381 correlation_id: impl Into<String>,
382 topic: impl Into<String>,
383 producer: impl Into<String>,
384 payload: Value,
385 ) {
386 self.publish(
387 CollaborationEvent::new(
388 correlation_id,
389 topic,
390 producer,
391 CollaborationEventKind::WorkPublished,
392 )
393 .payload(payload),
394 );
395 }
396
397 /// Request feedback from another specialist or reviewer.
398 ///
399 /// Publishes a [`CollaborationEventKind::FeedbackRequested`] event.
400 /// The caller can then use [`Workspace::wait_for_feedback`] to suspend
401 /// until the matching [`CollaborationEventKind::FeedbackProvided`] arrives.
402 ///
403 /// # Example
404 ///
405 /// ```rust
406 /// use adk_code::Workspace;
407 ///
408 /// let ws = Workspace::new("./proj").build();
409 /// ws.request_feedback(
410 /// "corr-2",
411 /// "api-contract",
412 /// "backend_engineer",
413 /// serde_json::json!({ "schema": "v1" }),
414 /// );
415 /// ```
416 pub fn request_feedback(
417 &self,
418 correlation_id: impl Into<String>,
419 topic: impl Into<String>,
420 producer: impl Into<String>,
421 payload: Value,
422 ) {
423 self.publish(
424 CollaborationEvent::new(
425 correlation_id,
426 topic,
427 producer,
428 CollaborationEventKind::FeedbackRequested,
429 )
430 .payload(payload),
431 );
432 }
433
434 /// Provide feedback in response to a feedback request.
435 ///
436 /// Publishes a [`CollaborationEventKind::FeedbackProvided`] event.
437 /// Agents waiting via [`Workspace::wait_for_feedback`] on the same
438 /// `correlation_id` will be resumed.
439 ///
440 /// # Example
441 ///
442 /// ```rust
443 /// use adk_code::Workspace;
444 ///
445 /// let ws = Workspace::new("./proj").build();
446 /// ws.provide_feedback(
447 /// "corr-2",
448 /// "api-contract",
449 /// "reviewer",
450 /// serde_json::json!({ "approved": true }),
451 /// );
452 /// ```
453 pub fn provide_feedback(
454 &self,
455 correlation_id: impl Into<String>,
456 topic: impl Into<String>,
457 producer: impl Into<String>,
458 payload: Value,
459 ) {
460 self.publish(
461 CollaborationEvent::new(
462 correlation_id,
463 topic,
464 producer,
465 CollaborationEventKind::FeedbackProvided,
466 )
467 .payload(payload),
468 );
469 }
470
471 /// Signal that this agent is blocked and cannot continue.
472 ///
473 /// Publishes a [`CollaborationEventKind::Blocked`] event with a payload
474 /// describing what is needed to unblock.
475 ///
476 /// # Example
477 ///
478 /// ```rust
479 /// use adk_code::Workspace;
480 ///
481 /// let ws = Workspace::new("./proj").build();
482 /// ws.signal_blocked(
483 /// "corr-3",
484 /// "database-schema",
485 /// "backend_engineer",
486 /// serde_json::json!({ "needs": "schema approval" }),
487 /// );
488 /// ```
489 pub fn signal_blocked(
490 &self,
491 correlation_id: impl Into<String>,
492 topic: impl Into<String>,
493 producer: impl Into<String>,
494 payload: Value,
495 ) {
496 self.publish(
497 CollaborationEvent::new(
498 correlation_id,
499 topic,
500 producer,
501 CollaborationEventKind::Blocked,
502 )
503 .payload(payload),
504 );
505 }
506
507 /// Signal that a work item is completed.
508 ///
509 /// Publishes a [`CollaborationEventKind::Completed`] event.
510 ///
511 /// # Example
512 ///
513 /// ```rust
514 /// use adk_code::Workspace;
515 ///
516 /// let ws = Workspace::new("./proj").build();
517 /// ws.signal_completed("corr-1", "api-routes", "backend_engineer");
518 /// ```
519 pub fn signal_completed(
520 &self,
521 correlation_id: impl Into<String>,
522 topic: impl Into<String>,
523 producer: impl Into<String>,
524 ) {
525 self.publish(CollaborationEvent::new(
526 correlation_id,
527 topic,
528 producer,
529 CollaborationEventKind::Completed,
530 ));
531 }
532
533 /// Wait for a [`CollaborationEventKind::WorkPublished`] event matching
534 /// the given `correlation_id`.
535 ///
536 /// This is a convenience wrapper over [`Workspace::wait_for_kind`] that
537 /// filters for `WorkPublished` events specifically.
538 ///
539 /// # Example
540 ///
541 /// ```rust,no_run
542 /// # async fn example() {
543 /// use adk_code::Workspace;
544 /// use std::time::Duration;
545 ///
546 /// let ws = Workspace::new("./proj").build();
547 /// let result = ws.wait_for_work("corr-1", Duration::from_secs(5)).await;
548 /// # }
549 /// ```
550 pub async fn wait_for_work(
551 &self,
552 correlation_id: &str,
553 timeout: Duration,
554 ) -> Option<CollaborationEvent> {
555 self.wait_for_kind(correlation_id, CollaborationEventKind::WorkPublished, timeout).await
556 }
557
558 /// Wait for a [`CollaborationEventKind::FeedbackProvided`] event matching
559 /// the given `correlation_id`.
560 ///
561 /// This is a convenience wrapper over [`Workspace::wait_for_kind`] that
562 /// filters for `FeedbackProvided` events specifically.
563 ///
564 /// # Example
565 ///
566 /// ```rust,no_run
567 /// # async fn example() {
568 /// use adk_code::Workspace;
569 /// use std::time::Duration;
570 ///
571 /// let ws = Workspace::new("./proj").build();
572 /// let result = ws.wait_for_feedback("corr-2", Duration::from_secs(5)).await;
573 /// # }
574 /// ```
575 pub async fn wait_for_feedback(
576 &self,
577 correlation_id: &str,
578 timeout: Duration,
579 ) -> Option<CollaborationEvent> {
580 self.wait_for_kind(correlation_id, CollaborationEventKind::FeedbackProvided, timeout).await
581 }
582
583 /// Wait for a collaboration event matching both `correlation_id` and `kind`.
584 ///
585 /// Subscribes to the workspace event stream and returns the first event
586 /// whose `correlation_id` and `kind` both match. If no matching event
587 /// arrives within `timeout`, returns `None`.
588 ///
589 /// This is the most precise wait primitive — use it when you need to
590 /// filter on a specific event kind rather than any correlated event.
591 ///
592 /// # Example
593 ///
594 /// ```rust,no_run
595 /// # async fn example() {
596 /// use adk_code::{CollaborationEventKind, Workspace};
597 /// use std::time::Duration;
598 ///
599 /// let ws = Workspace::new("./proj").build();
600 /// let result = ws
601 /// .wait_for_kind("corr-1", CollaborationEventKind::WorkClaimed, Duration::from_secs(5))
602 /// .await;
603 /// # }
604 /// ```
605 pub async fn wait_for_kind(
606 &self,
607 correlation_id: &str,
608 kind: CollaborationEventKind,
609 timeout: Duration,
610 ) -> Option<CollaborationEvent> {
611 let mut rx = self.subscribe();
612 let deadline = tokio::time::sleep(timeout);
613 tokio::pin!(deadline);
614
615 loop {
616 tokio::select! {
617 result = rx.recv() => {
618 match result {
619 Ok(event)
620 if event.correlation_id == correlation_id
621 && event.kind == kind =>
622 {
623 return Some(event);
624 }
625 Ok(_) => continue,
626 Err(broadcast::error::RecvError::Lagged(skipped)) => {
627 tracing::warn!(
628 skipped,
629 "workspace subscriber lagged, {skipped} events dropped"
630 );
631 continue;
632 }
633 Err(broadcast::error::RecvError::Closed) => {
634 return None;
635 }
636 }
637 }
638 () = &mut deadline => {
639 return None;
640 }
641 }
642 }
643 }
644}
645
646/// Builder for constructing a [`Workspace`] with fluent configuration.
647///
648/// # Example
649///
650/// ```rust
651/// use adk_code::Workspace;
652///
653/// let workspace = Workspace::new("./app")
654/// .project_name("my-app")
655/// .session_id("session-42")
656/// .created_at(1719000000)
657/// .build();
658///
659/// assert_eq!(workspace.metadata().project_name, "my-app");
660/// assert_eq!(workspace.metadata().created_at, Some(1719000000));
661/// ```
662#[derive(Debug, Clone)]
663pub struct WorkspaceBuilder {
664 root: PathBuf,
665 project_name: Option<String>,
666 session_id: Option<String>,
667 created_at: Option<u64>,
668 channel_capacity: usize,
669}
670
671impl WorkspaceBuilder {
672 /// Set the project name.
673 ///
674 /// If not set, defaults to the root directory's file name or `"unnamed"`.
675 pub fn project_name(mut self, name: impl Into<String>) -> Self {
676 self.project_name = Some(name.into());
677 self
678 }
679
680 /// Set the session ID for execution correlation.
681 pub fn session_id(mut self, id: impl Into<String>) -> Self {
682 self.session_id = Some(id.into());
683 self
684 }
685
686 /// Set the workspace creation timestamp (Unix epoch seconds).
687 pub fn created_at(mut self, timestamp: u64) -> Self {
688 self.created_at = Some(timestamp);
689 self
690 }
691
692 /// Set the broadcast channel capacity for collaboration events.
693 ///
694 /// Defaults to 256. Larger values retain more event history at the cost
695 /// of memory. Events beyond the capacity are dropped for slow subscribers.
696 pub fn channel_capacity(mut self, capacity: usize) -> Self {
697 self.channel_capacity = capacity;
698 self
699 }
700
701 /// Build the [`Workspace`].
702 ///
703 /// If `project_name` was not set, it defaults to the root directory's
704 /// file name component, or `"unnamed"` if the path has no file name.
705 pub fn build(self) -> Workspace {
706 let project_name = self.project_name.unwrap_or_else(|| {
707 self.root.file_name().and_then(|n| n.to_str()).unwrap_or("unnamed").to_string()
708 });
709
710 let (tx, _rx) = broadcast::channel(self.channel_capacity);
711
712 Workspace {
713 inner: Arc::new(WorkspaceInner {
714 root: self.root,
715 metadata: WorkspaceMetadata {
716 project_name,
717 session_id: self.session_id,
718 created_at: self.created_at,
719 },
720 tx,
721 event_log: RwLock::new(Vec::new()),
722 }),
723 }
724 }
725}
726
727/// Metadata about a workspace project and execution session.
728///
729/// # Example
730///
731/// ```rust
732/// use adk_code::WorkspaceMetadata;
733///
734/// let meta = WorkspaceMetadata {
735/// project_name: "demo".to_string(),
736/// session_id: Some("sess-1".to_string()),
737/// created_at: Some(1719000000),
738/// };
739/// assert_eq!(meta.project_name, "demo");
740/// ```
741#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
742#[serde(rename_all = "camelCase")]
743pub struct WorkspaceMetadata {
744 /// Human-readable project name.
745 pub project_name: String,
746 /// Optional session ID for execution and telemetry correlation.
747 pub session_id: Option<String>,
748 /// Optional creation timestamp (Unix epoch seconds).
749 pub created_at: Option<u64>,
750}
751
752/// The kind of a collaboration event in a shared workspace.
753///
754/// These typed event kinds support ownership, correlation, and wait/resume
755/// semantics for multi-agent project builds. They are more disciplined than
756/// raw pub/sub and preserve completion semantics.
757///
758/// # Example
759///
760/// ```rust
761/// use adk_code::CollaborationEventKind;
762///
763/// let kind = CollaborationEventKind::NeedWork;
764/// assert_ne!(kind, CollaborationEventKind::Completed);
765/// ```
766#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
767#[serde(rename_all = "camelCase")]
768pub enum CollaborationEventKind {
769 /// An agent requests a dependency from another specialist or coordinator.
770 NeedWork,
771 /// Another agent or coordinator accepts ownership of the requested work.
772 WorkClaimed,
773 /// The requested work product is now available in the workspace.
774 WorkPublished,
775 /// The producer asks for review or contract validation.
776 FeedbackRequested,
777 /// A specialist responds with approval or requested changes.
778 FeedbackProvided,
779 /// The producer cannot continue without another dependency or decision.
780 Blocked,
781 /// The work item is done.
782 Completed,
783}
784
785impl std::fmt::Display for CollaborationEventKind {
786 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
787 match self {
788 Self::NeedWork => write!(f, "NeedWork"),
789 Self::WorkClaimed => write!(f, "WorkClaimed"),
790 Self::WorkPublished => write!(f, "WorkPublished"),
791 Self::FeedbackRequested => write!(f, "FeedbackRequested"),
792 Self::FeedbackProvided => write!(f, "FeedbackProvided"),
793 Self::Blocked => write!(f, "Blocked"),
794 Self::Completed => write!(f, "Completed"),
795 }
796 }
797}
798
799/// A typed collaboration event for cross-agent coordination in a shared workspace.
800///
801/// Collaboration events carry correlation IDs, topic names, producer/consumer
802/// identities, and structured payloads. They support the wait/resume pattern:
803/// an agent can publish a [`CollaborationEventKind::NeedWork`] event and resume
804/// only when a matching [`CollaborationEventKind::WorkPublished`] event arrives.
805///
806/// # Example
807///
808/// ```rust
809/// use adk_code::{CollaborationEvent, CollaborationEventKind};
810///
811/// let event = CollaborationEvent::new(
812/// "corr-42",
813/// "api-routes",
814/// "backend_engineer",
815/// CollaborationEventKind::WorkPublished,
816/// )
817/// .consumer("frontend_engineer")
818/// .payload(serde_json::json!({ "routes": ["/api/users"] }));
819///
820/// assert_eq!(event.correlation_id, "corr-42");
821/// assert_eq!(event.consumer.as_deref(), Some("frontend_engineer"));
822/// ```
823#[derive(Debug, Clone, Serialize, Deserialize)]
824#[serde(rename_all = "camelCase")]
825pub struct CollaborationEvent {
826 /// Correlation ID linking related events (e.g., request and response).
827 pub correlation_id: String,
828 /// Topic or work item name this event relates to.
829 pub topic: String,
830 /// Identity of the agent that produced this event.
831 pub producer: String,
832 /// Identity of the intended consumer, if targeted.
833 pub consumer: Option<String>,
834 /// The kind of collaboration event.
835 pub kind: CollaborationEventKind,
836 /// Structured payload carrying event-specific data.
837 pub payload: Value,
838 /// Timestamp when the event was created (Unix epoch milliseconds).
839 pub timestamp: u64,
840}
841
842impl CollaborationEvent {
843 /// Create a new collaboration event with the given correlation ID, topic,
844 /// producer, and kind.
845 ///
846 /// The payload defaults to `null`, consumer defaults to `None`, and
847 /// timestamp defaults to `0` (callers should set it via [`Self::timestamp`]).
848 ///
849 /// # Example
850 ///
851 /// ```rust
852 /// use adk_code::{CollaborationEvent, CollaborationEventKind};
853 ///
854 /// let event = CollaborationEvent::new(
855 /// "req-1",
856 /// "database-schema",
857 /// "coordinator",
858 /// CollaborationEventKind::NeedWork,
859 /// );
860 /// assert_eq!(event.kind, CollaborationEventKind::NeedWork);
861 /// assert_eq!(event.producer, "coordinator");
862 /// ```
863 pub fn new(
864 correlation_id: impl Into<String>,
865 topic: impl Into<String>,
866 producer: impl Into<String>,
867 kind: CollaborationEventKind,
868 ) -> Self {
869 Self {
870 correlation_id: correlation_id.into(),
871 topic: topic.into(),
872 producer: producer.into(),
873 consumer: None,
874 kind,
875 payload: Value::Null,
876 timestamp: 0,
877 }
878 }
879
880 /// Set the intended consumer for this event.
881 pub fn consumer(mut self, consumer: impl Into<String>) -> Self {
882 self.consumer = Some(consumer.into());
883 self
884 }
885
886 /// Set the structured payload for this event.
887 pub fn payload(mut self, payload: Value) -> Self {
888 self.payload = payload;
889 self
890 }
891
892 /// Set the timestamp (Unix epoch milliseconds).
893 pub fn timestamp(mut self, ts: u64) -> Self {
894 self.timestamp = ts;
895 self
896 }
897}
898
899#[cfg(test)]
900mod tests {
901 use super::*;
902
903 #[test]
904 fn workspace_builder_defaults_project_name_from_root() {
905 let ws = Workspace::new("/tmp/my-project").build();
906 assert_eq!(ws.root(), &PathBuf::from("/tmp/my-project"));
907 assert_eq!(ws.metadata().project_name, "my-project");
908 assert_eq!(ws.metadata().session_id, None);
909 assert_eq!(ws.metadata().created_at, None);
910 }
911
912 #[test]
913 fn workspace_builder_with_all_fields() {
914 let ws = Workspace::new("./demo")
915 .project_name("demo-site")
916 .session_id("sess-abc")
917 .created_at(1719000000)
918 .build();
919 assert_eq!(ws.root(), &PathBuf::from("./demo"));
920 assert_eq!(ws.metadata().project_name, "demo-site");
921 assert_eq!(ws.metadata().session_id.as_deref(), Some("sess-abc"));
922 assert_eq!(ws.metadata().created_at, Some(1719000000));
923 }
924
925 #[test]
926 fn workspace_builder_unnamed_fallback() {
927 let ws = Workspace::new("/").build();
928 assert_eq!(ws.metadata().project_name, "unnamed");
929 }
930
931 #[test]
932 fn workspace_clone_shares_transport() {
933 let ws1 = Workspace::new("./proj").build();
934 let ws2 = ws1.clone();
935 let mut rx = ws2.subscribe();
936
937 ws1.publish(CollaborationEvent::new(
938 "c1",
939 "topic",
940 "producer",
941 CollaborationEventKind::WorkPublished,
942 ));
943
944 let event = rx.try_recv().expect("should receive event from clone");
945 assert_eq!(event.correlation_id, "c1");
946 }
947
948 #[test]
949 fn publish_with_no_subscribers_returns_zero() {
950 let ws = Workspace::new("./proj").build();
951 let count = ws.publish(CollaborationEvent::new(
952 "c1",
953 "topic",
954 "producer",
955 CollaborationEventKind::NeedWork,
956 ));
957 assert_eq!(count, 0);
958 }
959
960 #[test]
961 fn publish_with_subscriber_returns_count() {
962 let ws = Workspace::new("./proj").build();
963 let _rx1 = ws.subscribe();
964 let _rx2 = ws.subscribe();
965
966 let count = ws.publish(CollaborationEvent::new(
967 "c1",
968 "topic",
969 "producer",
970 CollaborationEventKind::NeedWork,
971 ));
972 assert_eq!(count, 2);
973 }
974
975 #[test]
976 fn subscribe_receives_published_events() {
977 let ws = Workspace::new("./proj").build();
978 let mut rx = ws.subscribe();
979
980 ws.publish(CollaborationEvent::new(
981 "c1",
982 "api",
983 "backend",
984 CollaborationEventKind::WorkPublished,
985 ));
986 ws.publish(CollaborationEvent::new(
987 "c2",
988 "schema",
989 "db",
990 CollaborationEventKind::Completed,
991 ));
992
993 let e1 = rx.try_recv().unwrap();
994 assert_eq!(e1.correlation_id, "c1");
995 assert_eq!(e1.kind, CollaborationEventKind::WorkPublished);
996
997 let e2 = rx.try_recv().unwrap();
998 assert_eq!(e2.correlation_id, "c2");
999 assert_eq!(e2.kind, CollaborationEventKind::Completed);
1000 }
1001
1002 #[tokio::test]
1003 async fn wait_for_returns_matching_event() {
1004 let ws = Workspace::new("./proj").build();
1005 let ws_clone = ws.clone();
1006
1007 // Spawn a task that publishes the matching event after a short delay.
1008 tokio::spawn(async move {
1009 tokio::time::sleep(Duration::from_millis(10)).await;
1010 // Publish a non-matching event first.
1011 ws_clone.publish(CollaborationEvent::new(
1012 "other",
1013 "unrelated",
1014 "someone",
1015 CollaborationEventKind::NeedWork,
1016 ));
1017 // Then publish the matching event.
1018 ws_clone.publish(
1019 CollaborationEvent::new(
1020 "target",
1021 "api",
1022 "backend",
1023 CollaborationEventKind::WorkPublished,
1024 )
1025 .payload(serde_json::json!({ "done": true })),
1026 );
1027 });
1028
1029 let result = ws.wait_for("target", Duration::from_secs(1)).await;
1030 let event = result.expect("should receive matching event");
1031 assert_eq!(event.correlation_id, "target");
1032 assert_eq!(event.kind, CollaborationEventKind::WorkPublished);
1033 assert_eq!(event.payload, serde_json::json!({ "done": true }));
1034 }
1035
1036 #[tokio::test]
1037 async fn wait_for_times_out_when_no_match() {
1038 let ws = Workspace::new("./proj").build();
1039 let result = ws.wait_for("nonexistent", Duration::from_millis(50)).await;
1040 assert!(result.is_none());
1041 }
1042
1043 #[tokio::test]
1044 async fn wait_for_ignores_non_matching_events() {
1045 let ws = Workspace::new("./proj").build();
1046 let ws_clone = ws.clone();
1047
1048 tokio::spawn(async move {
1049 tokio::time::sleep(Duration::from_millis(5)).await;
1050 // Publish several non-matching events.
1051 for i in 0..5 {
1052 ws_clone.publish(CollaborationEvent::new(
1053 format!("wrong-{i}"),
1054 "topic",
1055 "producer",
1056 CollaborationEventKind::NeedWork,
1057 ));
1058 }
1059 // Then publish the matching one.
1060 ws_clone.publish(CollaborationEvent::new(
1061 "right",
1062 "topic",
1063 "producer",
1064 CollaborationEventKind::WorkPublished,
1065 ));
1066 });
1067
1068 let result = ws.wait_for("right", Duration::from_secs(1)).await;
1069 assert!(result.is_some());
1070 assert_eq!(result.unwrap().correlation_id, "right");
1071 }
1072
1073 #[test]
1074 fn events_returns_buffered_events() {
1075 let ws = Workspace::new("./proj").channel_capacity(16).build();
1076
1077 ws.publish(CollaborationEvent::new("c1", "t1", "p1", CollaborationEventKind::NeedWork));
1078 ws.publish(CollaborationEvent::new("c2", "t2", "p2", CollaborationEventKind::Completed));
1079
1080 let events = ws.events();
1081 assert_eq!(events.len(), 2);
1082 assert_eq!(events[0].correlation_id, "c1");
1083 assert_eq!(events[1].correlation_id, "c2");
1084 }
1085
1086 #[test]
1087 fn collaboration_event_kind_display() {
1088 assert_eq!(CollaborationEventKind::NeedWork.to_string(), "NeedWork");
1089 assert_eq!(CollaborationEventKind::WorkClaimed.to_string(), "WorkClaimed");
1090 assert_eq!(CollaborationEventKind::WorkPublished.to_string(), "WorkPublished");
1091 assert_eq!(CollaborationEventKind::FeedbackRequested.to_string(), "FeedbackRequested");
1092 assert_eq!(CollaborationEventKind::FeedbackProvided.to_string(), "FeedbackProvided");
1093 assert_eq!(CollaborationEventKind::Blocked.to_string(), "Blocked");
1094 assert_eq!(CollaborationEventKind::Completed.to_string(), "Completed");
1095 }
1096
1097 #[test]
1098 fn collaboration_event_new_defaults() {
1099 let event = CollaborationEvent::new(
1100 "corr-1",
1101 "backend-api",
1102 "coordinator",
1103 CollaborationEventKind::NeedWork,
1104 );
1105 assert_eq!(event.correlation_id, "corr-1");
1106 assert_eq!(event.topic, "backend-api");
1107 assert_eq!(event.producer, "coordinator");
1108 assert_eq!(event.consumer, None);
1109 assert_eq!(event.kind, CollaborationEventKind::NeedWork);
1110 assert_eq!(event.payload, Value::Null);
1111 assert_eq!(event.timestamp, 0);
1112 }
1113
1114 #[test]
1115 fn collaboration_event_builder_methods() {
1116 let event = CollaborationEvent::new(
1117 "corr-2",
1118 "api-routes",
1119 "backend_engineer",
1120 CollaborationEventKind::WorkPublished,
1121 )
1122 .consumer("frontend_engineer")
1123 .payload(serde_json::json!({ "routes": ["/api/users"] }))
1124 .timestamp(1719000000000);
1125
1126 assert_eq!(event.consumer.as_deref(), Some("frontend_engineer"));
1127 assert_eq!(event.payload, serde_json::json!({ "routes": ["/api/users"] }));
1128 assert_eq!(event.timestamp, 1719000000000);
1129 }
1130
1131 #[test]
1132 fn collaboration_event_kind_equality_and_hash() {
1133 use std::collections::HashSet;
1134 let mut set = HashSet::new();
1135 set.insert(CollaborationEventKind::NeedWork);
1136 set.insert(CollaborationEventKind::NeedWork);
1137 set.insert(CollaborationEventKind::Completed);
1138 assert_eq!(set.len(), 2);
1139 }
1140
1141 #[test]
1142 fn collaboration_event_kind_copy() {
1143 let kind = CollaborationEventKind::Blocked;
1144 let copy = kind;
1145 assert_eq!(kind, copy);
1146 }
1147
1148 #[test]
1149 fn workspace_metadata_serialization_roundtrip() {
1150 let meta = WorkspaceMetadata {
1151 project_name: "test-proj".to_string(),
1152 session_id: Some("sess-1".to_string()),
1153 created_at: Some(1719000000),
1154 };
1155 let json = serde_json::to_string(&meta).unwrap();
1156 let deserialized: WorkspaceMetadata = serde_json::from_str(&json).unwrap();
1157 assert_eq!(meta, deserialized);
1158 }
1159
1160 #[test]
1161 fn collaboration_event_serialization_roundtrip() {
1162 let event = CollaborationEvent::new(
1163 "corr-rt",
1164 "schema",
1165 "db_engineer",
1166 CollaborationEventKind::FeedbackRequested,
1167 )
1168 .consumer("reviewer")
1169 .payload(serde_json::json!({ "tables": ["users"] }))
1170 .timestamp(1719000000000);
1171
1172 let json = serde_json::to_string(&event).unwrap();
1173 let deserialized: CollaborationEvent = serde_json::from_str(&json).unwrap();
1174 assert_eq!(deserialized.correlation_id, "corr-rt");
1175 assert_eq!(deserialized.kind, CollaborationEventKind::FeedbackRequested);
1176 assert_eq!(deserialized.consumer.as_deref(), Some("reviewer"));
1177 }
1178
1179 // ── Agent-facing helper tests ──────────────────────────────────────
1180
1181 #[test]
1182 fn request_work_publishes_need_work_event() {
1183 let ws = Workspace::new("./proj").build();
1184 let event = ws.request_work("corr-rw", "api-routes", "frontend");
1185 assert_eq!(event.correlation_id, "corr-rw");
1186 assert_eq!(event.topic, "api-routes");
1187 assert_eq!(event.producer, "frontend");
1188 assert_eq!(event.kind, CollaborationEventKind::NeedWork);
1189
1190 let events = ws.events();
1191 assert_eq!(events.len(), 1);
1192 assert_eq!(events[0].kind, CollaborationEventKind::NeedWork);
1193 }
1194
1195 #[test]
1196 fn claim_work_publishes_work_claimed_event() {
1197 let ws = Workspace::new("./proj").build();
1198 ws.claim_work("corr-cw", "api-routes", "backend");
1199
1200 let events = ws.events();
1201 assert_eq!(events.len(), 1);
1202 assert_eq!(events[0].correlation_id, "corr-cw");
1203 assert_eq!(events[0].kind, CollaborationEventKind::WorkClaimed);
1204 }
1205
1206 #[test]
1207 fn publish_work_publishes_work_published_with_payload() {
1208 let ws = Workspace::new("./proj").build();
1209 ws.publish_work(
1210 "corr-pw",
1211 "api-routes",
1212 "backend",
1213 serde_json::json!({ "routes": ["/users"] }),
1214 );
1215
1216 let events = ws.events();
1217 assert_eq!(events.len(), 1);
1218 assert_eq!(events[0].kind, CollaborationEventKind::WorkPublished);
1219 assert_eq!(events[0].payload, serde_json::json!({ "routes": ["/users"] }));
1220 }
1221
1222 #[test]
1223 fn request_feedback_publishes_feedback_requested_with_payload() {
1224 let ws = Workspace::new("./proj").build();
1225 ws.request_feedback(
1226 "corr-rf",
1227 "api-contract",
1228 "backend",
1229 serde_json::json!({ "schema": "v1" }),
1230 );
1231
1232 let events = ws.events();
1233 assert_eq!(events.len(), 1);
1234 assert_eq!(events[0].kind, CollaborationEventKind::FeedbackRequested);
1235 assert_eq!(events[0].payload, serde_json::json!({ "schema": "v1" }));
1236 }
1237
1238 #[test]
1239 fn provide_feedback_publishes_feedback_provided_with_payload() {
1240 let ws = Workspace::new("./proj").build();
1241 ws.provide_feedback(
1242 "corr-pf",
1243 "api-contract",
1244 "reviewer",
1245 serde_json::json!({ "approved": true }),
1246 );
1247
1248 let events = ws.events();
1249 assert_eq!(events.len(), 1);
1250 assert_eq!(events[0].kind, CollaborationEventKind::FeedbackProvided);
1251 assert_eq!(events[0].payload, serde_json::json!({ "approved": true }));
1252 }
1253
1254 #[test]
1255 fn signal_blocked_publishes_blocked_with_payload() {
1256 let ws = Workspace::new("./proj").build();
1257 ws.signal_blocked(
1258 "corr-sb",
1259 "database-schema",
1260 "backend",
1261 serde_json::json!({ "needs": "approval" }),
1262 );
1263
1264 let events = ws.events();
1265 assert_eq!(events.len(), 1);
1266 assert_eq!(events[0].kind, CollaborationEventKind::Blocked);
1267 assert_eq!(events[0].payload, serde_json::json!({ "needs": "approval" }));
1268 }
1269
1270 #[test]
1271 fn signal_completed_publishes_completed_event() {
1272 let ws = Workspace::new("./proj").build();
1273 ws.signal_completed("corr-sc", "api-routes", "backend");
1274
1275 let events = ws.events();
1276 assert_eq!(events.len(), 1);
1277 assert_eq!(events[0].correlation_id, "corr-sc");
1278 assert_eq!(events[0].kind, CollaborationEventKind::Completed);
1279 }
1280
1281 #[tokio::test]
1282 async fn wait_for_work_returns_work_published_event() {
1283 let ws = Workspace::new("./proj").build();
1284 let ws_clone = ws.clone();
1285
1286 tokio::spawn(async move {
1287 tokio::time::sleep(Duration::from_millis(10)).await;
1288 // Publish a non-matching kind first (same correlation).
1289 ws_clone.claim_work("corr-wfw", "api", "backend");
1290 // Then publish the matching WorkPublished.
1291 ws_clone.publish_work(
1292 "corr-wfw",
1293 "api",
1294 "backend",
1295 serde_json::json!({ "done": true }),
1296 );
1297 });
1298
1299 let result = ws.wait_for_work("corr-wfw", Duration::from_secs(1)).await;
1300 let event = result.expect("should receive WorkPublished event");
1301 assert_eq!(event.kind, CollaborationEventKind::WorkPublished);
1302 assert_eq!(event.payload, serde_json::json!({ "done": true }));
1303 }
1304
1305 #[tokio::test]
1306 async fn wait_for_feedback_returns_feedback_provided_event() {
1307 let ws = Workspace::new("./proj").build();
1308 let ws_clone = ws.clone();
1309
1310 tokio::spawn(async move {
1311 tokio::time::sleep(Duration::from_millis(10)).await;
1312 // Publish a FeedbackRequested first (same correlation, wrong kind).
1313 ws_clone.request_feedback(
1314 "corr-wff",
1315 "contract",
1316 "backend",
1317 serde_json::json!({ "schema": "v1" }),
1318 );
1319 // Then publish the matching FeedbackProvided.
1320 ws_clone.provide_feedback(
1321 "corr-wff",
1322 "contract",
1323 "reviewer",
1324 serde_json::json!({ "approved": true }),
1325 );
1326 });
1327
1328 let result = ws.wait_for_feedback("corr-wff", Duration::from_secs(1)).await;
1329 let event = result.expect("should receive FeedbackProvided event");
1330 assert_eq!(event.kind, CollaborationEventKind::FeedbackProvided);
1331 assert_eq!(event.payload, serde_json::json!({ "approved": true }));
1332 }
1333
1334 #[tokio::test]
1335 async fn wait_for_kind_filters_by_both_correlation_and_kind() {
1336 let ws = Workspace::new("./proj").build();
1337 let ws_clone = ws.clone();
1338
1339 tokio::spawn(async move {
1340 tokio::time::sleep(Duration::from_millis(10)).await;
1341 // Same correlation, wrong kind.
1342 ws_clone.request_work("corr-wfk", "topic", "agent-a");
1343 // Wrong correlation, right kind.
1344 ws_clone.claim_work("other-corr", "topic", "agent-b");
1345 // Both match.
1346 ws_clone.claim_work("corr-wfk", "topic", "agent-b");
1347 });
1348
1349 let result = ws
1350 .wait_for_kind("corr-wfk", CollaborationEventKind::WorkClaimed, Duration::from_secs(1))
1351 .await;
1352 let event = result.expect("should receive matching event");
1353 assert_eq!(event.correlation_id, "corr-wfk");
1354 assert_eq!(event.kind, CollaborationEventKind::WorkClaimed);
1355 assert_eq!(event.producer, "agent-b");
1356 }
1357
1358 #[tokio::test]
1359 async fn wait_for_kind_times_out_when_kind_does_not_match() {
1360 let ws = Workspace::new("./proj").build();
1361 let ws_clone = ws.clone();
1362
1363 tokio::spawn(async move {
1364 tokio::time::sleep(Duration::from_millis(5)).await;
1365 // Right correlation, wrong kind.
1366 ws_clone.request_work("corr-to", "topic", "agent");
1367 });
1368
1369 let result = ws
1370 .wait_for_kind("corr-to", CollaborationEventKind::Completed, Duration::from_millis(100))
1371 .await;
1372 assert!(result.is_none());
1373 }
1374
1375 #[test]
1376 fn full_collaboration_flow_via_helpers() {
1377 let ws = Workspace::new("./proj").build();
1378
1379 // Coordinator requests work.
1380 ws.request_work("flow-1", "backend-api", "coordinator");
1381 // Specialist claims it.
1382 ws.claim_work("flow-1", "backend-api", "backend_engineer");
1383 // Specialist publishes the result.
1384 ws.publish_work(
1385 "flow-1",
1386 "backend-api",
1387 "backend_engineer",
1388 serde_json::json!({ "endpoints": 3 }),
1389 );
1390 // Specialist requests feedback.
1391 ws.request_feedback(
1392 "flow-1",
1393 "backend-api",
1394 "backend_engineer",
1395 serde_json::json!({ "review": "please" }),
1396 );
1397 // Reviewer provides feedback.
1398 ws.provide_feedback(
1399 "flow-1",
1400 "backend-api",
1401 "reviewer",
1402 serde_json::json!({ "approved": true }),
1403 );
1404 // Specialist signals completion.
1405 ws.signal_completed("flow-1", "backend-api", "backend_engineer");
1406
1407 let events = ws.events();
1408 assert_eq!(events.len(), 6);
1409 assert_eq!(events[0].kind, CollaborationEventKind::NeedWork);
1410 assert_eq!(events[1].kind, CollaborationEventKind::WorkClaimed);
1411 assert_eq!(events[2].kind, CollaborationEventKind::WorkPublished);
1412 assert_eq!(events[3].kind, CollaborationEventKind::FeedbackRequested);
1413 assert_eq!(events[4].kind, CollaborationEventKind::FeedbackProvided);
1414 assert_eq!(events[5].kind, CollaborationEventKind::Completed);
1415 }
1416}