Skip to main content

forge_core/cron/
context.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use tracing::Span;
6use uuid::Uuid;
7
8use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
9use crate::function::AuthContext;
10use crate::http::CircuitBreakerClient;
11
12/// Context available to cron handlers.
13pub struct CronContext {
14    /// Cron run ID.
15    pub run_id: Uuid,
16    /// Cron name.
17    pub cron_name: String,
18    /// Scheduled time (when the cron was supposed to run).
19    pub scheduled_time: DateTime<Utc>,
20    /// Actual execution time.
21    pub execution_time: DateTime<Utc>,
22    /// Timezone of the cron.
23    pub timezone: String,
24    /// Whether this is a catch-up run.
25    pub is_catch_up: bool,
26    /// Authentication context.
27    pub auth: AuthContext,
28    /// Database pool.
29    db_pool: sqlx::PgPool,
30    /// HTTP client.
31    http_client: CircuitBreakerClient,
32    /// Default timeout for outbound HTTP requests made through the
33    /// circuit-breaker client. `None` means unlimited.
34    http_timeout: Option<Duration>,
35    /// Structured logger.
36    pub log: CronLog,
37    /// Environment variable provider.
38    env_provider: Arc<dyn EnvProvider>,
39    /// Parent span for trace propagation.
40    span: Span,
41}
42
43impl CronContext {
44    /// Create a new cron context.
45    pub fn new(
46        run_id: Uuid,
47        cron_name: String,
48        scheduled_time: DateTime<Utc>,
49        timezone: String,
50        is_catch_up: bool,
51        db_pool: sqlx::PgPool,
52        http_client: CircuitBreakerClient,
53    ) -> Self {
54        Self {
55            run_id,
56            cron_name: cron_name.clone(),
57            scheduled_time,
58            execution_time: Utc::now(),
59            timezone,
60            is_catch_up,
61            auth: AuthContext::unauthenticated(),
62            db_pool,
63            http_client,
64            http_timeout: None,
65            log: CronLog::new(cron_name),
66            env_provider: Arc::new(RealEnvProvider::new()),
67            span: Span::current(),
68        }
69    }
70
71    /// Set environment provider.
72    pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
73        self.env_provider = provider;
74        self
75    }
76
77    pub fn db(&self) -> crate::function::ForgeDb {
78        crate::function::ForgeDb::from_pool(&self.db_pool)
79    }
80
81    /// Get a `DbConn` for use in shared helper functions.
82    pub fn db_conn(&self) -> crate::function::DbConn<'_> {
83        crate::function::DbConn::Pool(self.db_pool.clone())
84    }
85
86    /// Acquire a connection compatible with sqlx compile-time checked macros.
87    pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
88        Ok(crate::function::ForgeConn::Pool(
89            self.db_pool.acquire().await?,
90        ))
91    }
92
93    pub fn http(&self) -> crate::http::HttpClient {
94        self.http_client.with_timeout(self.http_timeout)
95    }
96
97    pub fn raw_http(&self) -> &reqwest::Client {
98        self.http_client.inner()
99    }
100
101    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
102        self.http_timeout = timeout;
103    }
104
105    /// Get the delay between scheduled and actual execution time.
106    pub fn delay(&self) -> chrono::Duration {
107        self.execution_time - self.scheduled_time
108    }
109
110    /// Check if the cron is running late (more than 1 minute delay).
111    pub fn is_late(&self) -> bool {
112        self.delay() > chrono::Duration::minutes(1)
113    }
114
115    /// Set authentication context.
116    pub fn with_auth(mut self, auth: AuthContext) -> Self {
117        self.auth = auth;
118        self
119    }
120
121    /// Get the trace ID for this cron execution.
122    ///
123    /// Returns the trace ID if OpenTelemetry is configured, otherwise returns the run_id.
124    pub fn trace_id(&self) -> String {
125        // The span carries the trace context. When OTel is configured,
126        // the trace ID can be extracted from the span context.
127        // For now, return the run_id as a fallback correlation ID.
128        self.run_id.to_string()
129    }
130
131    /// Get the parent span for trace propagation.
132    ///
133    /// Use this to create child spans within cron handlers.
134    pub fn span(&self) -> &Span {
135        &self.span
136    }
137}
138
139impl EnvAccess for CronContext {
140    fn env_provider(&self) -> &dyn EnvProvider {
141        self.env_provider.as_ref()
142    }
143}
144
145/// Structured logger for cron jobs.
146#[derive(Clone)]
147pub struct CronLog {
148    cron_name: String,
149}
150
151impl CronLog {
152    /// Create a new cron logger.
153    pub fn new(cron_name: String) -> Self {
154        Self { cron_name }
155    }
156
157    /// Log an info message.
158    pub fn info(&self, message: &str, data: serde_json::Value) {
159        tracing::info!(
160            cron_name = %self.cron_name,
161            data = %data,
162            "{}",
163            message
164        );
165    }
166
167    /// Log a warning message.
168    pub fn warn(&self, message: &str, data: serde_json::Value) {
169        tracing::warn!(
170            cron_name = %self.cron_name,
171            data = %data,
172            "{}",
173            message
174        );
175    }
176
177    /// Log an error message.
178    pub fn error(&self, message: &str, data: serde_json::Value) {
179        tracing::error!(
180            cron_name = %self.cron_name,
181            data = %data,
182            "{}",
183            message
184        );
185    }
186
187    /// Log a debug message.
188    pub fn debug(&self, message: &str, data: serde_json::Value) {
189        tracing::debug!(
190            cron_name = %self.cron_name,
191            data = %data,
192            "{}",
193            message
194        );
195    }
196}
197
198#[cfg(test)]
199#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
200mod tests {
201    use super::*;
202
203    #[tokio::test]
204    async fn test_cron_context_creation() {
205        let pool = sqlx::postgres::PgPoolOptions::new()
206            .max_connections(1)
207            .connect_lazy("postgres://localhost/nonexistent")
208            .expect("Failed to create mock pool");
209
210        let run_id = Uuid::new_v4();
211        let scheduled = Utc::now() - chrono::Duration::seconds(30);
212
213        let ctx = CronContext::new(
214            run_id,
215            "test_cron".to_string(),
216            scheduled,
217            "UTC".to_string(),
218            false,
219            pool,
220            CircuitBreakerClient::with_defaults(reqwest::Client::new()),
221        );
222
223        assert_eq!(ctx.run_id, run_id);
224        assert_eq!(ctx.cron_name, "test_cron");
225        assert!(!ctx.is_catch_up);
226    }
227
228    #[tokio::test]
229    async fn test_cron_delay() {
230        let pool = sqlx::postgres::PgPoolOptions::new()
231            .max_connections(1)
232            .connect_lazy("postgres://localhost/nonexistent")
233            .expect("Failed to create mock pool");
234
235        let scheduled = Utc::now() - chrono::Duration::minutes(5);
236
237        let ctx = CronContext::new(
238            Uuid::new_v4(),
239            "test_cron".to_string(),
240            scheduled,
241            "UTC".to_string(),
242            false,
243            pool,
244            CircuitBreakerClient::with_defaults(reqwest::Client::new()),
245        );
246
247        assert!(ctx.is_late());
248        assert!(ctx.delay() >= chrono::Duration::minutes(5));
249    }
250
251    #[test]
252    fn test_cron_log() {
253        let log = CronLog::new("test_cron".to_string());
254        log.info("Test message", serde_json::json!({"key": "value"}));
255    }
256}