1use crate::arena::{ArenaReader, ArenaWriter};
4use crate::error::Result;
5use crate::testing::providers::RealSecrets;
6use crate::testing::providers::{
7 ClockProvider, EnvProvider, FsProvider, HttpProvider, RealClock, RealEnv, RealFs, RealHttp,
8 RealRng, RealUuid, RngProvider, SecretsProvider, UuidProvider,
9};
10use crate::traits::trigger::TriggerType;
11use crate::types::{ArenaOffset, NodeId, RelPtr, TraceId};
12use crate::wal::Wal;
13use std::sync::Arc;
14
15pub struct Context {
32 trace_id: TraceId,
34 node_id: NodeId,
36 reader: ArenaReader,
38 writer: ArenaWriter,
40 wal: Arc<Wal>,
42
43 clock: Arc<dyn ClockProvider>,
46 http: Arc<dyn HttpProvider>,
48 rng: Arc<dyn RngProvider>,
50 uuid: Arc<dyn UuidProvider>,
52 fs: Arc<dyn FsProvider>,
54 env: Arc<dyn EnvProvider>,
56 secrets: Arc<dyn SecretsProvider>,
58}
59
60impl Context {
61 pub fn new(
63 trace_id: TraceId,
64 node_id: NodeId,
65 reader: ArenaReader,
66 writer: ArenaWriter,
67 wal: Arc<Wal>,
68 ) -> Self {
69 Self {
70 trace_id,
71 node_id,
72 reader,
73 writer,
74 wal,
75 clock: Arc::new(RealClock::new()),
76 http: Arc::new(RealHttp::new()),
77 rng: Arc::new(RealRng::new()),
78 uuid: Arc::new(RealUuid::new()),
79 fs: Arc::new(RealFs::new()),
80 env: Arc::new(RealEnv::new()),
81 secrets: Arc::new(RealSecrets::default()),
82 }
83 }
84
85 #[allow(clippy::too_many_arguments)]
89 pub fn with_providers(
90 trace_id: TraceId,
91 node_id: NodeId,
92 reader: ArenaReader,
93 writer: ArenaWriter,
94 wal: Arc<Wal>,
95 clock: Arc<dyn ClockProvider>,
96 http: Arc<dyn HttpProvider>,
97 rng: Arc<dyn RngProvider>,
98 uuid: Arc<dyn UuidProvider>,
99 fs: Arc<dyn FsProvider>,
100 env: Arc<dyn EnvProvider>,
101 secrets: Arc<dyn SecretsProvider>,
102 ) -> Self {
103 Self {
104 trace_id,
105 node_id,
106 reader,
107 writer,
108 wal,
109 clock,
110 http,
111 rng,
112 uuid,
113 fs,
114 env,
115 secrets,
116 }
117 }
118
119 pub fn trace_id(&self) -> TraceId {
121 self.trace_id
122 }
123
124 pub fn node_id(&self) -> NodeId {
126 self.node_id
127 }
128
129 pub fn read_bytes(&self, ptr: RelPtr<()>) -> Result<Vec<u8>> {
131 self.reader.read_bytes(ptr.offset(), ptr.size() as usize)
132 }
133
134 pub fn write_bytes(&self, bytes: &[u8]) -> Result<RelPtr<()>> {
136 self.writer.write_bytes(bytes)
137 }
138
139 pub fn read_raw(&self, offset: ArenaOffset, size: usize) -> Result<Vec<u8>> {
141 self.reader.read_bytes(offset, size)
142 }
143
144 pub fn write_raw(&self, bytes: &[u8]) -> Result<RelPtr<()>> {
146 self.writer.write_bytes(bytes)
147 }
148
149 pub fn log(&self, message: impl AsRef<str>) {
151 tracing::info!(
152 trace_id = %self.trace_id,
153 node_id = %self.node_id,
154 "{}",
155 message.as_ref()
156 );
157 }
158
159 pub fn warn(&self, message: impl AsRef<str>) {
161 tracing::warn!(
162 trace_id = %self.trace_id,
163 node_id = %self.node_id,
164 "{}",
165 message.as_ref()
166 );
167 }
168
169 pub fn error(&self, message: impl AsRef<str>) {
171 tracing::error!(
172 trace_id = %self.trace_id,
173 node_id = %self.node_id,
174 "{}",
175 message.as_ref()
176 );
177 }
178
179 pub fn write_position(&self) -> ArenaOffset {
181 self.writer.write_position()
182 }
183
184 pub fn wal(&self) -> &Wal {
186 &self.wal
187 }
188
189 pub fn clock(&self) -> &dyn ClockProvider {
193 &*self.clock
194 }
195
196 pub fn http(&self) -> &dyn HttpProvider {
198 &*self.http
199 }
200
201 pub fn rng(&self) -> &dyn RngProvider {
203 &*self.rng
204 }
205
206 pub fn uuid_provider(&self) -> &dyn UuidProvider {
208 &*self.uuid
209 }
210
211 pub fn fs(&self) -> &dyn FsProvider {
213 &*self.fs
214 }
215
216 pub fn env_provider(&self) -> &dyn EnvProvider {
218 &*self.env
219 }
220
221 pub fn secrets(&self) -> &dyn SecretsProvider {
223 &*self.secrets
224 }
225
226 pub fn now(&self) -> u64 {
230 self.clock.now()
231 }
232
233 pub fn system_time_millis(&self) -> u64 {
235 self.clock.system_time_millis()
236 }
237
238 pub fn new_uuid(&self) -> uuid::Uuid {
240 self.uuid.new_v4()
241 }
242
243 pub fn random_u64(&self) -> u64 {
245 self.rng.next_u64()
246 }
247
248 pub fn random_f64(&self) -> f64 {
250 self.rng.next_f64()
251 }
252
253 pub fn env_var(&self, key: &str) -> Option<String> {
255 self.env.var(key)
256 }
257
258 pub fn secret(&self, key: &str) -> Option<String> {
260 self.secrets.get(key)
261 }
262}
263
264#[derive(Clone)]
266pub struct PipelineCtx {
267 pub name: String,
269 pub version: u32,
271 pub triggers: TriggerController,
273}
274
275impl PipelineCtx {
276 pub fn new(name: impl Into<String>, version: u32) -> Self {
278 Self {
279 name: name.into(),
280 version,
281 triggers: TriggerController::new(),
282 }
283 }
284
285 pub fn log(&self, message: impl AsRef<str>) {
287 tracing::info!(
288 pipeline = %self.name,
289 version = %self.version,
290 "{}",
291 message.as_ref()
292 );
293 }
294}
295
296#[derive(Clone)]
298pub struct TriggerController {
299 _private: (),
301}
302
303impl TriggerController {
304 pub fn new() -> Self {
306 Self { _private: () }
307 }
308
309 pub fn pause(&self, _trigger_type: TriggerType) {
311 tracing::info!("Pausing trigger");
313 }
314
315 pub fn resume(&self, _trigger_type: TriggerType) {
317 tracing::info!("Resuming trigger");
318 }
319
320 pub fn pause_all(&self) {
322 tracing::info!("Pausing all triggers");
323 }
324
325 pub fn resume_all(&self) {
327 tracing::info!("Resuming all triggers");
328 }
329}
330
331impl Default for TriggerController {
332 fn default() -> Self {
333 Self::new()
334 }
335}