1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
//! Canonical completion delivery seam for async operation terminals.
//!
//! The [`CompletionFeed`] trait provides a monotonic, append-only read handle
//! over operation terminal events. The runtime registry is the sole writer;
//! consumers (agent boundary, idle wake) are read-only.
//!
//! This replaces the prior parallel-truth paths (shell job projection,
//! detached wake booleans, poll_external_updates background_completions).
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::ops::OperationId;
use crate::ops_lifecycle::{OperationKind, OperationTerminalOutcome};
/// Monotonic sequence number assigned to each completion event.
pub type CompletionSeq = u64;
/// A single completion event in the feed.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CompletionEntry {
pub seq: CompletionSeq,
pub operation_id: OperationId,
pub kind: OperationKind,
pub display_name: String,
pub terminal_outcome: OperationTerminalOutcome,
pub completed_at_ms: Option<u64>,
}
/// A batch of completion entries with an atomically-captured watermark.
///
/// The `watermark` is captured at the same instant as the entry snapshot,
/// ensuring no completions land between the read and the watermark check.
#[derive(Debug, Clone)]
pub struct CompletionBatch {
pub entries: Vec<CompletionEntry>,
/// Feed watermark at the time entries were read.
pub watermark: CompletionSeq,
}
/// Read-only handle to the canonical completion feed.
///
/// Consumers call [`list_since`](CompletionFeed::list_since) to drain entries
/// past a local cursor, and [`wait_for_advance`](CompletionFeed::wait_for_advance)
/// to block until the watermark advances past a given sequence.
pub trait CompletionFeed: Send + Sync + std::fmt::Debug {
/// Current feed watermark (highest seq written).
fn watermark(&self) -> CompletionSeq;
/// Return all entries with `seq > after_seq`, plus the current watermark.
///
/// The watermark in the returned batch is captured atomically with the
/// entry snapshot — no entry can land between the snapshot and the
/// watermark read.
fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch;
/// Block until the feed watermark advances past `after_seq`.
///
/// Returns the new watermark. If the watermark is already past `after_seq`,
/// returns immediately.
fn wait_for_advance(
&self,
after_seq: CompletionSeq,
) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>>;
}
// ---------------------------------------------------------------------------
// Enrichment provider for agent-boundary completion projection
// ---------------------------------------------------------------------------
/// Shell-level enrichment for a completed operation.
///
/// Carries display details (job ID, detail string) that the agent boundary
/// needs for `BackgroundJobCompleted` events. These come from the shell
/// `JobManager`, not from the feed itself.
#[derive(Debug, Clone)]
pub struct CompletionEnrichmentData {
pub job_id: String,
pub detail: String,
}
/// Provider of shell-level enrichment for completed operations.
///
/// The shell `JobManager` implements this trait. The agent boundary
/// calls [`enrich`](CompletionEnrichmentProvider::enrich) to look up
/// display details by operation ID.
pub trait CompletionEnrichmentProvider: Send + Sync {
fn enrich(&self, operation_id: &OperationId) -> Option<CompletionEnrichmentData>;
}
// ---------------------------------------------------------------------------
// Arc delegation
// ---------------------------------------------------------------------------
impl<T: CompletionFeed + ?Sized> CompletionFeed for Arc<T> {
fn watermark(&self) -> CompletionSeq {
(**self).watermark()
}
fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
(**self).list_since(after_seq)
}
fn wait_for_advance(
&self,
after_seq: CompletionSeq,
) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
(**self).wait_for_advance(after_seq)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, dead_code)]
pub(crate) mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
/// Simple in-memory mock for testing feed-dependent code paths.
#[derive(Debug)]
pub struct MockCompletionFeed {
watermark: AtomicU64,
entries: std::sync::Mutex<Vec<CompletionEntry>>,
}
impl MockCompletionFeed {
pub fn new() -> Self {
Self {
watermark: AtomicU64::new(0),
entries: std::sync::Mutex::new(Vec::new()),
}
}
/// Create a feed pre-seeded at a given watermark (simulates prior activity).
pub fn with_watermark(watermark: CompletionSeq) -> Self {
Self {
watermark: AtomicU64::new(watermark),
entries: std::sync::Mutex::new(Vec::new()),
}
}
pub fn push(&self, entry: CompletionEntry) {
let mut entries = self.entries.lock().unwrap();
self.watermark.store(entry.seq, Ordering::Release);
entries.push(entry);
}
}
impl CompletionFeed for MockCompletionFeed {
fn watermark(&self) -> CompletionSeq {
self.watermark.load(Ordering::Acquire)
}
fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
let entries = self.entries.lock().unwrap();
CompletionBatch {
entries: entries
.iter()
.filter(|e| e.seq > after_seq)
.cloned()
.collect(),
watermark: self.watermark.load(Ordering::Acquire),
}
}
fn wait_for_advance(
&self,
_after_seq: CompletionSeq,
) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
Box::pin(async move { self.watermark.load(Ordering::Acquire) })
}
}
#[test]
fn mock_feed_with_watermark_seeds_at_expected_value() {
let feed = MockCompletionFeed::with_watermark(42);
assert_eq!(feed.watermark(), 42);
assert!(feed.list_since(0).entries.is_empty());
}
}