Skip to main content

forge_core/cron/
context.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use tracing::Span;
6use uuid::Uuid;
7
8use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
9use crate::function::{AuthContext, KvHandle};
10use crate::http::CircuitBreakerClient;
11
12/// Context available to cron handlers.
13#[non_exhaustive]
14pub struct CronContext {
15    pub run_id: Uuid,
16    pub cron_name: String,
17    pub scheduled_time: DateTime<Utc>,
18    pub execution_time: DateTime<Utc>,
19    pub timezone: String,
20    pub is_catch_up: bool,
21    pub auth: AuthContext,
22    db_pool: sqlx::PgPool,
23    http_client: CircuitBreakerClient,
24    /// `None` means unlimited.
25    http_timeout: Option<Duration>,
26    env_provider: Arc<dyn EnvProvider>,
27    span: Span,
28    kv: Option<Arc<dyn KvHandle>>,
29}
30
31impl CronContext {
32    /// Create a new cron context.
33    pub fn new(
34        run_id: Uuid,
35        cron_name: impl Into<String>,
36        scheduled_time: DateTime<Utc>,
37        timezone: String,
38        is_catch_up: bool,
39        db_pool: sqlx::PgPool,
40        http_client: CircuitBreakerClient,
41    ) -> Self {
42        let cron_name = cron_name.into();
43        Self {
44            run_id,
45            cron_name,
46            scheduled_time,
47            execution_time: Utc::now(),
48            timezone,
49            is_catch_up,
50            auth: AuthContext::unauthenticated(),
51            db_pool,
52            http_client,
53            http_timeout: None,
54            env_provider: Arc::new(RealEnvProvider::new()),
55            span: Span::current(),
56            kv: None,
57        }
58    }
59
60    /// Attach a KV store handle. Called by the runtime before handing the
61    /// context to the handler.
62    pub fn with_kv(mut self, kv: Arc<dyn KvHandle>) -> Self {
63        self.kv = Some(kv);
64        self
65    }
66
67    /// Access the KV store.
68    pub fn kv(&self) -> crate::error::Result<&dyn KvHandle> {
69        self.kv
70            .as_deref()
71            .ok_or_else(|| crate::error::ForgeError::internal("KV store not available"))
72    }
73
74    /// Set environment provider.
75    pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
76        self.env_provider = provider;
77        self
78    }
79
80    pub fn db(&self) -> crate::function::ForgeDb {
81        crate::function::ForgeDb::from_pool(&self.db_pool)
82    }
83
84    /// Get a `DbConn` for use in shared helper functions.
85    pub fn db_conn(&self) -> crate::function::DbConn<'_> {
86        crate::function::DbConn::Pool(self.db_pool.clone())
87    }
88
89    /// Acquire a connection compatible with sqlx compile-time checked macros.
90    pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
91        Ok(crate::function::ForgeConn::Pool(
92            self.db_pool.acquire().await?,
93        ))
94    }
95
96    pub fn http(&self) -> crate::http::HttpClient {
97        self.http_client.with_timeout(self.http_timeout)
98    }
99
100    pub fn raw_http(&self) -> &reqwest::Client {
101        self.http_client.inner()
102    }
103
104    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
105        self.http_timeout = timeout;
106    }
107
108    /// Get the delay between scheduled and actual execution time.
109    pub fn delay(&self) -> chrono::Duration {
110        self.execution_time - self.scheduled_time
111    }
112
113    /// Check if the cron is running late (more than 1 minute delay).
114    pub fn is_late(&self) -> bool {
115        self.delay() > chrono::Duration::minutes(1)
116    }
117
118    /// Set authentication context.
119    pub fn with_auth(mut self, auth: AuthContext) -> Self {
120        self.auth = auth;
121        self
122    }
123
124    /// Get the trace ID for this cron execution.
125    ///
126    /// Returns the trace ID if OpenTelemetry is configured, otherwise returns the run_id.
127    pub fn trace_id(&self) -> String {
128        // The span carries the trace context. When OTel is configured,
129        // the trace ID can be extracted from the span context.
130        // For now, return the run_id as a fallback correlation ID.
131        self.run_id.to_string()
132    }
133
134    /// Get the parent span for trace propagation.
135    ///
136    /// Use this to create child spans within cron handlers.
137    pub fn span(&self) -> &Span {
138        &self.span
139    }
140}
141
142impl EnvAccess for CronContext {
143    fn env_provider(&self) -> &dyn EnvProvider {
144        self.env_provider.as_ref()
145    }
146}
147
148#[cfg(test)]
149#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
150mod tests {
151    use super::*;
152    use crate::env::MockEnvProvider;
153
154    fn make_ctx(scheduled: DateTime<Utc>, is_catch_up: bool) -> CronContext {
155        let pool = sqlx::postgres::PgPoolOptions::new()
156            .max_connections(1)
157            .connect_lazy("postgres://localhost/nonexistent")
158            .expect("Failed to create mock pool");
159        CronContext::new(
160            Uuid::new_v4(),
161            "test_cron".to_string(),
162            scheduled,
163            "UTC".to_string(),
164            is_catch_up,
165            pool,
166            CircuitBreakerClient::with_defaults(reqwest::Client::new()),
167        )
168    }
169
170    #[tokio::test]
171    async fn test_cron_context_creation() {
172        let scheduled = Utc::now() - chrono::Duration::seconds(30);
173        let ctx = make_ctx(scheduled, false);
174
175        assert_eq!(ctx.cron_name, "test_cron");
176        assert!(!ctx.is_catch_up);
177        // Default auth is unauthenticated.
178        assert!(!ctx.auth.is_authenticated());
179        // execution_time is set after scheduled, so delay must be non-negative.
180        assert!(ctx.delay() >= chrono::Duration::zero());
181    }
182
183    #[tokio::test]
184    async fn test_cron_delay() {
185        let scheduled = Utc::now() - chrono::Duration::minutes(5);
186        let ctx = make_ctx(scheduled, false);
187
188        assert!(ctx.is_late());
189        assert!(ctx.delay() >= chrono::Duration::minutes(5));
190    }
191
192    #[tokio::test]
193    async fn cron_on_time_is_not_late() {
194        // scheduled in the future or roughly now -> delay <= 1m, not late
195        let ctx = make_ctx(Utc::now() + chrono::Duration::seconds(5), false);
196        assert!(!ctx.is_late());
197    }
198
199    #[tokio::test]
200    async fn cron_catch_up_flag_round_trips() {
201        let ctx = make_ctx(Utc::now() - chrono::Duration::minutes(30), true);
202        assert!(ctx.is_catch_up);
203    }
204
205    #[tokio::test]
206    async fn cron_trace_id_returns_run_id_as_string() {
207        let ctx = make_ctx(Utc::now(), false);
208        assert_eq!(ctx.trace_id(), ctx.run_id.to_string());
209    }
210
211    #[tokio::test]
212    async fn cron_with_auth_replaces_default() {
213        use std::collections::HashMap;
214        let uid = Uuid::new_v4();
215        let auth = AuthContext::authenticated(uid, vec!["admin".to_string()], HashMap::new());
216        let ctx = make_ctx(Utc::now(), false).with_auth(auth);
217        assert!(ctx.auth.is_authenticated());
218        assert!(ctx.auth.has_role("admin"));
219    }
220
221    #[tokio::test]
222    async fn cron_with_env_provider_overrides_real() {
223        let mut mock = MockEnvProvider::new();
224        mock.set("FORGE_CRON_KEY", "v");
225        let ctx = make_ctx(Utc::now(), false).with_env_provider(Arc::new(mock));
226        use crate::env::EnvAccess;
227        assert_eq!(ctx.env("FORGE_CRON_KEY"), Some("v".to_string()));
228        assert_eq!(ctx.env("FORGE_MISSING"), None);
229    }
230
231    #[tokio::test]
232    async fn cron_set_http_timeout_does_not_panic() {
233        let mut ctx = make_ctx(Utc::now(), false);
234        ctx.set_http_timeout(Some(Duration::from_millis(100)));
235        let _ = ctx.http();
236        ctx.set_http_timeout(None);
237        let _ = ctx.http();
238    }
239}