Skip to main content

forge_core/cron/
context.rs

1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use tracing::Span;
5use uuid::Uuid;
6
7use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
8use crate::function::AuthContext;
9
10/// Context available to cron handlers.
11pub struct CronContext {
12    /// Cron run ID.
13    pub run_id: Uuid,
14    /// Cron name.
15    pub cron_name: String,
16    /// Scheduled time (when the cron was supposed to run).
17    pub scheduled_time: DateTime<Utc>,
18    /// Actual execution time.
19    pub execution_time: DateTime<Utc>,
20    /// Timezone of the cron.
21    pub timezone: String,
22    /// Whether this is a catch-up run.
23    pub is_catch_up: bool,
24    /// Authentication context.
25    pub auth: AuthContext,
26    /// Database pool.
27    db_pool: sqlx::PgPool,
28    /// HTTP client.
29    http_client: reqwest::Client,
30    /// Structured logger.
31    pub log: CronLog,
32    /// Environment variable provider.
33    env_provider: Arc<dyn EnvProvider>,
34    /// Parent span for trace propagation.
35    span: Span,
36}
37
38impl CronContext {
39    /// Create a new cron context.
40    pub fn new(
41        run_id: Uuid,
42        cron_name: String,
43        scheduled_time: DateTime<Utc>,
44        timezone: String,
45        is_catch_up: bool,
46        db_pool: sqlx::PgPool,
47        http_client: reqwest::Client,
48    ) -> Self {
49        Self {
50            run_id,
51            cron_name: cron_name.clone(),
52            scheduled_time,
53            execution_time: Utc::now(),
54            timezone,
55            is_catch_up,
56            auth: AuthContext::unauthenticated(),
57            db_pool,
58            http_client,
59            log: CronLog::new(cron_name),
60            env_provider: Arc::new(RealEnvProvider::new()),
61            span: Span::current(),
62        }
63    }
64
65    /// Set environment provider.
66    pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
67        self.env_provider = provider;
68        self
69    }
70
71    pub fn db(&self) -> &sqlx::PgPool {
72        &self.db_pool
73    }
74
75    /// Returns a `DbConn` wrapping the pool for shared helper functions.
76    pub fn db_conn(&self) -> crate::function::DbConn<'_> {
77        crate::function::DbConn::Pool(&self.db_pool)
78    }
79
80    /// Acquire a connection compatible with sqlx compile-time checked macros.
81    pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
82        Ok(crate::function::ForgeConn::Pool(
83            self.db_pool.acquire().await?,
84        ))
85    }
86
87    pub fn http(&self) -> &reqwest::Client {
88        &self.http_client
89    }
90
91    /// Get the delay between scheduled and actual execution time.
92    pub fn delay(&self) -> chrono::Duration {
93        self.execution_time - self.scheduled_time
94    }
95
96    /// Check if the cron is running late (more than 1 minute delay).
97    pub fn is_late(&self) -> bool {
98        self.delay() > chrono::Duration::minutes(1)
99    }
100
101    /// Set authentication context.
102    pub fn with_auth(mut self, auth: AuthContext) -> Self {
103        self.auth = auth;
104        self
105    }
106
107    /// Get the trace ID for this cron execution.
108    ///
109    /// Returns the trace ID if OpenTelemetry is configured, otherwise returns the run_id.
110    pub fn trace_id(&self) -> String {
111        // The span carries the trace context. When OTel is configured,
112        // the trace ID can be extracted from the span context.
113        // For now, return the run_id as a fallback correlation ID.
114        self.run_id.to_string()
115    }
116
117    /// Get the parent span for trace propagation.
118    ///
119    /// Use this to create child spans within cron handlers.
120    pub fn span(&self) -> &Span {
121        &self.span
122    }
123}
124
125impl EnvAccess for CronContext {
126    fn env_provider(&self) -> &dyn EnvProvider {
127        self.env_provider.as_ref()
128    }
129}
130
131/// Structured logger for cron jobs.
132#[derive(Clone)]
133pub struct CronLog {
134    cron_name: String,
135}
136
137impl CronLog {
138    /// Create a new cron logger.
139    pub fn new(cron_name: String) -> Self {
140        Self { cron_name }
141    }
142
143    /// Log an info message.
144    pub fn info(&self, message: &str, data: serde_json::Value) {
145        tracing::info!(
146            cron_name = %self.cron_name,
147            data = %data,
148            "{}",
149            message
150        );
151    }
152
153    /// Log a warning message.
154    pub fn warn(&self, message: &str, data: serde_json::Value) {
155        tracing::warn!(
156            cron_name = %self.cron_name,
157            data = %data,
158            "{}",
159            message
160        );
161    }
162
163    /// Log an error message.
164    pub fn error(&self, message: &str, data: serde_json::Value) {
165        tracing::error!(
166            cron_name = %self.cron_name,
167            data = %data,
168            "{}",
169            message
170        );
171    }
172
173    /// Log a debug message.
174    pub fn debug(&self, message: &str, data: serde_json::Value) {
175        tracing::debug!(
176            cron_name = %self.cron_name,
177            data = %data,
178            "{}",
179            message
180        );
181    }
182}
183
184#[cfg(test)]
185#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
186mod tests {
187    use super::*;
188
189    #[tokio::test]
190    async fn test_cron_context_creation() {
191        let pool = sqlx::postgres::PgPoolOptions::new()
192            .max_connections(1)
193            .connect_lazy("postgres://localhost/nonexistent")
194            .expect("Failed to create mock pool");
195
196        let run_id = Uuid::new_v4();
197        let scheduled = Utc::now() - chrono::Duration::seconds(30);
198
199        let ctx = CronContext::new(
200            run_id,
201            "test_cron".to_string(),
202            scheduled,
203            "UTC".to_string(),
204            false,
205            pool,
206            reqwest::Client::new(),
207        );
208
209        assert_eq!(ctx.run_id, run_id);
210        assert_eq!(ctx.cron_name, "test_cron");
211        assert!(!ctx.is_catch_up);
212    }
213
214    #[tokio::test]
215    async fn test_cron_delay() {
216        let pool = sqlx::postgres::PgPoolOptions::new()
217            .max_connections(1)
218            .connect_lazy("postgres://localhost/nonexistent")
219            .expect("Failed to create mock pool");
220
221        let scheduled = Utc::now() - chrono::Duration::minutes(5);
222
223        let ctx = CronContext::new(
224            Uuid::new_v4(),
225            "test_cron".to_string(),
226            scheduled,
227            "UTC".to_string(),
228            false,
229            pool,
230            reqwest::Client::new(),
231        );
232
233        assert!(ctx.is_late());
234        assert!(ctx.delay() >= chrono::Duration::minutes(5));
235    }
236
237    #[test]
238    fn test_cron_log() {
239        let log = CronLog::new("test_cron".to_string());
240        log.info("Test message", serde_json::json!({"key": "value"}));
241    }
242}