xerv_core/traits/
context.rs

1//! Execution context provided to nodes.
2
3use crate::arena::{ArenaReader, ArenaWriter};
4use crate::error::Result;
5use crate::testing::providers::RealSecrets;
6use crate::testing::providers::{
7    ClockProvider, EnvProvider, FsProvider, HttpProvider, RealClock, RealEnv, RealFs, RealHttp,
8    RealRng, RealUuid, RngProvider, SecretsProvider, UuidProvider,
9};
10use crate::traits::trigger::TriggerType;
11use crate::types::{ArenaOffset, NodeId, RelPtr, TraceId};
12use crate::wal::Wal;
13use std::sync::Arc;
14
15/// Execution context provided to nodes during execution.
16///
17/// The context provides access to:
18/// - Arena for reading/writing data
19/// - Pipeline configuration
20/// - Trace metadata
21/// - Logging and metrics
22/// - External providers (clock, HTTP, RNG, etc.)
23///
24/// # Providers
25///
26/// The context includes pluggable providers for external dependencies,
27/// enabling deterministic testing by mocking these dependencies.
28///
29/// In production, real providers are used automatically. In tests,
30/// use `TestContext` or the `with_providers()` constructor to inject mocks.
31pub struct Context {
32    /// The trace ID for this execution.
33    trace_id: TraceId,
34    /// The node ID being executed.
35    node_id: NodeId,
36    /// Arena reader for accessing upstream outputs.
37    reader: ArenaReader,
38    /// Arena writer for storing outputs.
39    writer: ArenaWriter,
40    /// WAL for durability.
41    wal: Arc<Wal>,
42
43    // Providers for external dependencies
44    /// Clock provider for time operations.
45    clock: Arc<dyn ClockProvider>,
46    /// HTTP provider for network requests.
47    http: Arc<dyn HttpProvider>,
48    /// RNG provider for random number generation.
49    rng: Arc<dyn RngProvider>,
50    /// UUID provider for UUID generation.
51    uuid: Arc<dyn UuidProvider>,
52    /// Filesystem provider for file operations.
53    fs: Arc<dyn FsProvider>,
54    /// Environment provider for environment variables.
55    env: Arc<dyn EnvProvider>,
56    /// Secrets provider for secret management.
57    secrets: Arc<dyn SecretsProvider>,
58}
59
60impl Context {
61    /// Create a new execution context with default (real) providers.
62    pub fn new(
63        trace_id: TraceId,
64        node_id: NodeId,
65        reader: ArenaReader,
66        writer: ArenaWriter,
67        wal: Arc<Wal>,
68    ) -> Self {
69        Self {
70            trace_id,
71            node_id,
72            reader,
73            writer,
74            wal,
75            clock: Arc::new(RealClock::new()),
76            http: Arc::new(RealHttp::new()),
77            rng: Arc::new(RealRng::new()),
78            uuid: Arc::new(RealUuid::new()),
79            fs: Arc::new(RealFs::new()),
80            env: Arc::new(RealEnv::new()),
81            secrets: Arc::new(RealSecrets::default()),
82        }
83    }
84
85    /// Create a new execution context with custom providers.
86    ///
87    /// This is primarily used for testing to inject mock providers.
88    #[allow(clippy::too_many_arguments)]
89    pub fn with_providers(
90        trace_id: TraceId,
91        node_id: NodeId,
92        reader: ArenaReader,
93        writer: ArenaWriter,
94        wal: Arc<Wal>,
95        clock: Arc<dyn ClockProvider>,
96        http: Arc<dyn HttpProvider>,
97        rng: Arc<dyn RngProvider>,
98        uuid: Arc<dyn UuidProvider>,
99        fs: Arc<dyn FsProvider>,
100        env: Arc<dyn EnvProvider>,
101        secrets: Arc<dyn SecretsProvider>,
102    ) -> Self {
103        Self {
104            trace_id,
105            node_id,
106            reader,
107            writer,
108            wal,
109            clock,
110            http,
111            rng,
112            uuid,
113            fs,
114            env,
115            secrets,
116        }
117    }
118
119    /// Get the current trace ID.
120    pub fn trace_id(&self) -> TraceId {
121        self.trace_id
122    }
123
124    /// Get the current node ID.
125    pub fn node_id(&self) -> NodeId {
126        self.node_id
127    }
128
129    /// Read bytes from the arena using a relative pointer.
130    pub fn read_bytes(&self, ptr: RelPtr<()>) -> Result<Vec<u8>> {
131        self.reader.read_bytes(ptr.offset(), ptr.size() as usize)
132    }
133
134    /// Write bytes to the arena and return a relative pointer.
135    pub fn write_bytes(&self, bytes: &[u8]) -> Result<RelPtr<()>> {
136        self.writer.write_bytes(bytes)
137    }
138
139    /// Read raw bytes from the arena at a specific offset.
140    pub fn read_raw(&self, offset: ArenaOffset, size: usize) -> Result<Vec<u8>> {
141        self.reader.read_bytes(offset, size)
142    }
143
144    /// Write raw bytes to the arena.
145    pub fn write_raw(&self, bytes: &[u8]) -> Result<RelPtr<()>> {
146        self.writer.write_bytes(bytes)
147    }
148
149    /// Log a message associated with this execution.
150    pub fn log(&self, message: impl AsRef<str>) {
151        tracing::info!(
152            trace_id = %self.trace_id,
153            node_id = %self.node_id,
154            "{}",
155            message.as_ref()
156        );
157    }
158
159    /// Log a warning message.
160    pub fn warn(&self, message: impl AsRef<str>) {
161        tracing::warn!(
162            trace_id = %self.trace_id,
163            node_id = %self.node_id,
164            "{}",
165            message.as_ref()
166        );
167    }
168
169    /// Log an error message.
170    pub fn error(&self, message: impl AsRef<str>) {
171        tracing::error!(
172            trace_id = %self.trace_id,
173            node_id = %self.node_id,
174            "{}",
175            message.as_ref()
176        );
177    }
178
179    /// Get the current write position in the arena.
180    pub fn write_position(&self) -> ArenaOffset {
181        self.writer.write_position()
182    }
183
184    /// Get the WAL handle.
185    pub fn wal(&self) -> &Wal {
186        &self.wal
187    }
188
189    // Provider accessors
190
191    /// Get the clock provider.
192    pub fn clock(&self) -> &dyn ClockProvider {
193        &*self.clock
194    }
195
196    /// Get the HTTP provider.
197    pub fn http(&self) -> &dyn HttpProvider {
198        &*self.http
199    }
200
201    /// Get the RNG provider.
202    pub fn rng(&self) -> &dyn RngProvider {
203        &*self.rng
204    }
205
206    /// Get the UUID provider.
207    pub fn uuid_provider(&self) -> &dyn UuidProvider {
208        &*self.uuid
209    }
210
211    /// Get the filesystem provider.
212    pub fn fs(&self) -> &dyn FsProvider {
213        &*self.fs
214    }
215
216    /// Get the environment provider.
217    pub fn env_provider(&self) -> &dyn EnvProvider {
218        &*self.env
219    }
220
221    /// Get the secrets provider.
222    pub fn secrets(&self) -> &dyn SecretsProvider {
223        &*self.secrets
224    }
225
226    // Convenience methods that use providers
227
228    /// Get current time in nanoseconds (monotonic).
229    pub fn now(&self) -> u64 {
230        self.clock.now()
231    }
232
233    /// Get current system time in milliseconds since UNIX epoch.
234    pub fn system_time_millis(&self) -> u64 {
235        self.clock.system_time_millis()
236    }
237
238    /// Generate a new UUID.
239    pub fn new_uuid(&self) -> uuid::Uuid {
240        self.uuid.new_v4()
241    }
242
243    /// Generate a random u64.
244    pub fn random_u64(&self) -> u64 {
245        self.rng.next_u64()
246    }
247
248    /// Generate a random f64 in [0, 1).
249    pub fn random_f64(&self) -> f64 {
250        self.rng.next_f64()
251    }
252
253    /// Get an environment variable.
254    pub fn env_var(&self, key: &str) -> Option<String> {
255        self.env.var(key)
256    }
257
258    /// Get a secret.
259    pub fn secret(&self, key: &str) -> Option<String> {
260        self.secrets.get(key)
261    }
262}
263
264/// Context for pipeline lifecycle hooks.
265#[derive(Clone)]
266pub struct PipelineCtx {
267    /// Pipeline name.
268    pub name: String,
269    /// Pipeline version.
270    pub version: u32,
271    /// Trigger controller.
272    pub triggers: TriggerController,
273}
274
275impl PipelineCtx {
276    /// Create a new pipeline context.
277    pub fn new(name: impl Into<String>, version: u32) -> Self {
278        Self {
279            name: name.into(),
280            version,
281            triggers: TriggerController::new(),
282        }
283    }
284
285    /// Log a message for the pipeline.
286    pub fn log(&self, message: impl AsRef<str>) {
287        tracing::info!(
288            pipeline = %self.name,
289            version = %self.version,
290            "{}",
291            message.as_ref()
292        );
293    }
294}
295
296/// Controller for managing triggers within a pipeline.
297#[derive(Clone)]
298pub struct TriggerController {
299    // Internal state for trigger management
300    _private: (),
301}
302
303impl TriggerController {
304    /// Create a new trigger controller.
305    pub fn new() -> Self {
306        Self { _private: () }
307    }
308
309    /// Pause a specific trigger type.
310    pub fn pause(&self, _trigger_type: TriggerType) {
311        // Implementation will be in the executor
312        tracing::info!("Pausing trigger");
313    }
314
315    /// Resume a specific trigger type.
316    pub fn resume(&self, _trigger_type: TriggerType) {
317        tracing::info!("Resuming trigger");
318    }
319
320    /// Pause all triggers.
321    pub fn pause_all(&self) {
322        tracing::info!("Pausing all triggers");
323    }
324
325    /// Resume all triggers.
326    pub fn resume_all(&self) {
327        tracing::info!("Resuming all triggers");
328    }
329}
330
331impl Default for TriggerController {
332    fn default() -> Self {
333        Self::new()
334    }
335}