use crate::arena::{ArenaReader, ArenaWriter};
use crate::error::Result;
use crate::testing::providers::RealSecrets;
use crate::testing::providers::{
ClockProvider, EnvProvider, FsProvider, HttpProvider, RealClock, RealEnv, RealFs, RealHttp,
RealRng, RealUuid, RngProvider, SecretsProvider, UuidProvider,
};
use crate::traits::trigger::TriggerType;
use crate::types::{ArenaOffset, NodeId, RelPtr, TraceId};
use crate::wal::Wal;
use std::sync::Arc;
pub struct Context {
trace_id: TraceId,
node_id: NodeId,
reader: ArenaReader,
writer: ArenaWriter,
wal: Arc<Wal>,
clock: Arc<dyn ClockProvider>,
http: Arc<dyn HttpProvider>,
rng: Arc<dyn RngProvider>,
uuid: Arc<dyn UuidProvider>,
fs: Arc<dyn FsProvider>,
env: Arc<dyn EnvProvider>,
secrets: Arc<dyn SecretsProvider>,
}
impl Context {
pub fn new(
trace_id: TraceId,
node_id: NodeId,
reader: ArenaReader,
writer: ArenaWriter,
wal: Arc<Wal>,
) -> Self {
Self {
trace_id,
node_id,
reader,
writer,
wal,
clock: Arc::new(RealClock::new()),
http: Arc::new(RealHttp::new()),
rng: Arc::new(RealRng::new()),
uuid: Arc::new(RealUuid::new()),
fs: Arc::new(RealFs::new()),
env: Arc::new(RealEnv::new()),
secrets: Arc::new(RealSecrets::default()),
}
}
#[allow(clippy::too_many_arguments)]
pub fn with_providers(
trace_id: TraceId,
node_id: NodeId,
reader: ArenaReader,
writer: ArenaWriter,
wal: Arc<Wal>,
clock: Arc<dyn ClockProvider>,
http: Arc<dyn HttpProvider>,
rng: Arc<dyn RngProvider>,
uuid: Arc<dyn UuidProvider>,
fs: Arc<dyn FsProvider>,
env: Arc<dyn EnvProvider>,
secrets: Arc<dyn SecretsProvider>,
) -> Self {
Self {
trace_id,
node_id,
reader,
writer,
wal,
clock,
http,
rng,
uuid,
fs,
env,
secrets,
}
}
pub fn trace_id(&self) -> TraceId {
self.trace_id
}
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn read_bytes(&self, ptr: RelPtr<()>) -> Result<Vec<u8>> {
self.reader.read_bytes(ptr.offset(), ptr.size() as usize)
}
pub fn write_bytes(&self, bytes: &[u8]) -> Result<RelPtr<()>> {
self.writer.write_bytes(bytes)
}
pub fn read_raw(&self, offset: ArenaOffset, size: usize) -> Result<Vec<u8>> {
self.reader.read_bytes(offset, size)
}
pub fn write_raw(&self, bytes: &[u8]) -> Result<RelPtr<()>> {
self.writer.write_bytes(bytes)
}
pub fn log(&self, message: impl AsRef<str>) {
tracing::info!(
trace_id = %self.trace_id,
node_id = %self.node_id,
"{}",
message.as_ref()
);
}
pub fn warn(&self, message: impl AsRef<str>) {
tracing::warn!(
trace_id = %self.trace_id,
node_id = %self.node_id,
"{}",
message.as_ref()
);
}
pub fn error(&self, message: impl AsRef<str>) {
tracing::error!(
trace_id = %self.trace_id,
node_id = %self.node_id,
"{}",
message.as_ref()
);
}
pub fn write_position(&self) -> ArenaOffset {
self.writer.write_position()
}
pub fn wal(&self) -> &Wal {
&self.wal
}
pub fn clock(&self) -> &dyn ClockProvider {
&*self.clock
}
pub fn http(&self) -> &dyn HttpProvider {
&*self.http
}
pub fn rng(&self) -> &dyn RngProvider {
&*self.rng
}
pub fn uuid_provider(&self) -> &dyn UuidProvider {
&*self.uuid
}
pub fn fs(&self) -> &dyn FsProvider {
&*self.fs
}
pub fn env_provider(&self) -> &dyn EnvProvider {
&*self.env
}
pub fn secrets(&self) -> &dyn SecretsProvider {
&*self.secrets
}
pub fn now(&self) -> u64 {
self.clock.now()
}
pub fn system_time_millis(&self) -> u64 {
self.clock.system_time_millis()
}
pub fn new_uuid(&self) -> uuid::Uuid {
self.uuid.new_v4()
}
pub fn random_u64(&self) -> u64 {
self.rng.next_u64()
}
pub fn random_f64(&self) -> f64 {
self.rng.next_f64()
}
pub fn env_var(&self, key: &str) -> Option<String> {
self.env.var(key)
}
pub fn secret(&self, key: &str) -> Option<String> {
self.secrets.get(key)
}
}
#[derive(Clone)]
pub struct PipelineCtx {
pub name: String,
pub version: u32,
pub triggers: TriggerController,
}
impl PipelineCtx {
pub fn new(name: impl Into<String>, version: u32) -> Self {
Self {
name: name.into(),
version,
triggers: TriggerController::new(),
}
}
pub fn log(&self, message: impl AsRef<str>) {
tracing::info!(
pipeline = %self.name,
version = %self.version,
"{}",
message.as_ref()
);
}
}
#[derive(Clone)]
pub struct TriggerController {
_private: (),
}
impl TriggerController {
pub fn new() -> Self {
Self { _private: () }
}
pub fn pause(&self, _trigger_type: TriggerType) {
tracing::info!("Pausing trigger");
}
pub fn resume(&self, _trigger_type: TriggerType) {
tracing::info!("Resuming trigger");
}
pub fn pause_all(&self) {
tracing::info!("Pausing all triggers");
}
pub fn resume_all(&self) {
tracing::info!("Resuming all triggers");
}
}
impl Default for TriggerController {
fn default() -> Self {
Self::new()
}
}