Skip to main content

pi/
agent_cx.rs

1//! Capability-scoped async context for Pi.
2//!
3//! Pi builds on `asupersync` which provides a capability-based [`asupersync::Cx`] for cancellation,
4//! budgeting, and deterministic testing hooks. Historically this codebase has sometimes passed raw
5//! `Cx` instances around ad-hoc (or constructed them at call sites), which makes it harder to audit
6//! the *intended* capability boundary between subsystems.
7//!
8//! `AgentCx` is a thin, explicit wrapper used at API boundaries (agent loop ↔ tools ↔ sessions ↔
9//! RPC). It is intentionally small: it does **not** try to introduce a new runtime or replace
10//! `asupersync::Cx`; it just centralizes how Pi threads context through async code.
11
12use asupersync::{Budget, Cx};
13use std::ops::Deref;
14use std::path::Path;
15use std::time::Duration;
16
17/// A capability-scoped context for agent operations.
18///
19/// ## Construction
20/// - **Production:** prefer constructing once per top-level request/run and passing `&AgentCx`
21///   through.
22/// - **Tests:** use [`Self::for_testing`] / [`Self::for_testing_with_io`] to avoid ambient
23///   dependencies and to keep runs deterministic.
24#[derive(Debug, Clone)]
25pub struct AgentCx {
26    cx: Cx,
27}
28
29impl AgentCx {
30    /// Wrap an existing `asupersync::Cx`.
31    #[must_use]
32    pub const fn from_cx(cx: Cx) -> Self {
33        Self { cx }
34    }
35
36    /// Use the ambient context when available, otherwise fall back to a request-scoped context.
37    ///
38    /// This is useful when helper code may run either inside an already-scoped async task
39    /// (where inheriting the current cancellation/budget is desirable) or at a top-level entry
40    /// point that needs to create a fresh request context.
41    #[must_use]
42    pub fn for_current_or_request() -> Self {
43        Self {
44            cx: Cx::current().unwrap_or_else(Cx::for_request),
45        }
46    }
47
48    /// Create a request-scoped context (infinite budget).
49    #[must_use]
50    pub fn for_request() -> Self {
51        Self {
52            cx: Cx::for_request(),
53        }
54    }
55
56    /// Create a request-scoped context with an explicit budget.
57    #[must_use]
58    pub fn for_request_with_budget(budget: Budget) -> Self {
59        Self {
60            cx: Cx::for_request_with_budget(budget),
61        }
62    }
63
64    /// Create a test-only context (infinite budget).
65    #[must_use]
66    pub fn for_testing() -> Self {
67        Self {
68            cx: Cx::for_testing(),
69        }
70    }
71
72    /// Create a test-only context with lab I/O capability.
73    #[must_use]
74    pub fn for_testing_with_io() -> Self {
75        Self {
76            cx: Cx::for_testing_with_io(),
77        }
78    }
79
80    /// Borrow the underlying `asupersync` context.
81    #[must_use]
82    pub const fn cx(&self) -> &Cx {
83        &self.cx
84    }
85
86    /// Filesystem capability accessor.
87    #[must_use]
88    pub const fn fs(&self) -> AgentFs<'_> {
89        AgentFs { _cx: self }
90    }
91
92    /// Time capability accessor.
93    #[must_use]
94    pub const fn time(&self) -> AgentTime<'_> {
95        AgentTime { cx: self }
96    }
97
98    /// HTTP capability accessor.
99    #[must_use]
100    pub const fn http(&self) -> AgentHttp<'_> {
101        AgentHttp { _cx: self }
102    }
103
104    /// Process capability accessor.
105    #[must_use]
106    pub const fn process(&self) -> AgentProcess<'_> {
107        AgentProcess { _cx: self }
108    }
109}
110
111impl Deref for AgentCx {
112    type Target = Cx;
113
114    fn deref(&self) -> &Self::Target {
115        self.cx()
116    }
117}
118
119/// Filesystem-related operations.
120pub struct AgentFs<'a> {
121    _cx: &'a AgentCx,
122}
123
124impl AgentFs<'_> {
125    pub async fn read(&self, path: impl AsRef<Path>) -> std::io::Result<Vec<u8>> {
126        asupersync::fs::read(path).await
127    }
128
129    pub async fn write(
130        &self,
131        path: impl AsRef<Path>,
132        contents: impl AsRef<[u8]>,
133    ) -> std::io::Result<()> {
134        asupersync::fs::write(path, contents).await
135    }
136
137    pub async fn create_dir_all(&self, path: impl AsRef<Path>) -> std::io::Result<()> {
138        asupersync::fs::create_dir_all(path).await
139    }
140}
141
142/// Time-related operations.
143pub struct AgentTime<'a> {
144    cx: &'a AgentCx,
145}
146
147impl AgentTime<'_> {
148    pub async fn sleep(&self, duration: Duration) {
149        let now = self
150            .cx
151            .cx()
152            .timer_driver()
153            .map_or_else(asupersync::time::wall_now, |timer| timer.now());
154        asupersync::time::sleep(now, duration).await;
155    }
156}
157
158/// HTTP-related operations.
159pub struct AgentHttp<'a> {
160    _cx: &'a AgentCx,
161}
162
163impl AgentHttp<'_> {
164    #[must_use]
165    pub fn client(&self) -> crate::http::client::Client {
166        crate::http::client::Client::new()
167    }
168}
169
170/// Process-related operations.
171pub struct AgentProcess<'a> {
172    _cx: &'a AgentCx,
173}
174
175impl AgentProcess<'_> {
176    #[must_use]
177    pub fn command(&self, program: &str) -> std::process::Command {
178        std::process::Command::new(program)
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn for_request_creates_valid_context() {
188        let cx = AgentCx::for_request();
189        // Verify the inner Cx is accessible.
190        let _ = cx.cx();
191    }
192
193    #[test]
194    fn from_cx_wraps_existing_context() {
195        let inner = Cx::for_request();
196        let cx = AgentCx::from_cx(inner);
197        let _ = cx.cx();
198    }
199
200    #[test]
201    fn for_current_or_request_creates_valid_context() {
202        let cx = AgentCx::for_current_or_request();
203        let _ = cx.cx();
204    }
205
206    #[test]
207    fn for_testing_creates_valid_context() {
208        let cx = AgentCx::for_testing();
209        let _ = cx.cx();
210    }
211
212    #[test]
213    fn for_testing_with_io_creates_valid_context() {
214        let cx = AgentCx::for_testing_with_io();
215        let _ = cx.cx();
216    }
217
218    #[test]
219    fn for_request_with_budget_creates_valid_context() {
220        let budget = Budget::new().with_poll_quota(100);
221        let cx = AgentCx::for_request_with_budget(budget);
222        let _ = cx.cx();
223    }
224
225    #[test]
226    fn fs_accessor_returns_agent_fs() {
227        let cx = AgentCx::for_testing();
228        let _fs = cx.fs();
229    }
230
231    #[test]
232    fn time_accessor_returns_agent_time() {
233        let cx = AgentCx::for_testing();
234        let _time = cx.time();
235    }
236
237    #[test]
238    fn http_accessor_returns_agent_http() {
239        let cx = AgentCx::for_testing();
240        let _http = cx.http();
241    }
242
243    #[test]
244    fn process_accessor_returns_agent_process() {
245        let cx = AgentCx::for_testing();
246        let _proc = cx.process();
247    }
248
249    #[test]
250    fn process_command_creates_command() {
251        let cx = AgentCx::for_testing();
252        let cmd = cx.process().command("echo");
253        assert_eq!(cmd.get_program(), "echo");
254    }
255
256    #[test]
257    fn agent_cx_is_clone() {
258        let cx = AgentCx::for_testing();
259        let cx2 = cx.clone();
260        let _ = cx.cx();
261        let _ = cx2.cx();
262    }
263}