forge_core/cron/
context.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use tracing::Span;
6use uuid::Uuid;
7
8use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
9use crate::function::{AuthContext, KvHandle};
10use crate::http::CircuitBreakerClient;
11
12#[non_exhaustive]
14pub struct CronContext {
15 pub run_id: Uuid,
16 pub cron_name: String,
17 pub scheduled_time: DateTime<Utc>,
18 pub execution_time: DateTime<Utc>,
19 pub timezone: String,
20 pub is_catch_up: bool,
21 pub auth: AuthContext,
22 db_pool: sqlx::PgPool,
23 http_client: CircuitBreakerClient,
24 http_timeout: Option<Duration>,
26 env_provider: Arc<dyn EnvProvider>,
27 span: Span,
28 kv: Option<Arc<dyn KvHandle>>,
29}
30
31impl CronContext {
32 pub fn new(
34 run_id: Uuid,
35 cron_name: impl Into<String>,
36 scheduled_time: DateTime<Utc>,
37 timezone: String,
38 is_catch_up: bool,
39 db_pool: sqlx::PgPool,
40 http_client: CircuitBreakerClient,
41 ) -> Self {
42 let cron_name = cron_name.into();
43 Self {
44 run_id,
45 cron_name,
46 scheduled_time,
47 execution_time: Utc::now(),
48 timezone,
49 is_catch_up,
50 auth: AuthContext::unauthenticated(),
51 db_pool,
52 http_client,
53 http_timeout: None,
54 env_provider: Arc::new(RealEnvProvider::new()),
55 span: Span::current(),
56 kv: None,
57 }
58 }
59
60 pub fn with_kv(mut self, kv: Arc<dyn KvHandle>) -> Self {
63 self.kv = Some(kv);
64 self
65 }
66
67 pub fn kv(&self) -> crate::error::Result<&dyn KvHandle> {
69 self.kv
70 .as_deref()
71 .ok_or_else(|| crate::error::ForgeError::internal("KV store not available"))
72 }
73
74 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
76 self.env_provider = provider;
77 self
78 }
79
80 pub fn db(&self) -> crate::function::ForgeDb {
81 crate::function::ForgeDb::from_pool(&self.db_pool)
82 }
83
84 pub fn db_conn(&self) -> crate::function::DbConn<'_> {
86 crate::function::DbConn::Pool(self.db_pool.clone())
87 }
88
89 pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
91 Ok(crate::function::ForgeConn::Pool(
92 self.db_pool.acquire().await?,
93 ))
94 }
95
96 pub fn http(&self) -> crate::http::HttpClient {
97 self.http_client.with_timeout(self.http_timeout)
98 }
99
100 pub fn raw_http(&self) -> &reqwest::Client {
101 self.http_client.inner()
102 }
103
104 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
105 self.http_timeout = timeout;
106 }
107
108 pub fn delay(&self) -> chrono::Duration {
110 self.execution_time - self.scheduled_time
111 }
112
113 pub fn is_late(&self) -> bool {
115 self.delay() > chrono::Duration::minutes(1)
116 }
117
118 pub fn with_auth(mut self, auth: AuthContext) -> Self {
120 self.auth = auth;
121 self
122 }
123
124 pub fn trace_id(&self) -> String {
128 self.run_id.to_string()
132 }
133
134 pub fn span(&self) -> &Span {
138 &self.span
139 }
140}
141
142impl EnvAccess for CronContext {
143 fn env_provider(&self) -> &dyn EnvProvider {
144 self.env_provider.as_ref()
145 }
146}
147
148#[cfg(test)]
149#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
150mod tests {
151 use super::*;
152 use crate::env::MockEnvProvider;
153
154 fn make_ctx(scheduled: DateTime<Utc>, is_catch_up: bool) -> CronContext {
155 let pool = sqlx::postgres::PgPoolOptions::new()
156 .max_connections(1)
157 .connect_lazy("postgres://localhost/nonexistent")
158 .expect("Failed to create mock pool");
159 CronContext::new(
160 Uuid::new_v4(),
161 "test_cron".to_string(),
162 scheduled,
163 "UTC".to_string(),
164 is_catch_up,
165 pool,
166 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
167 )
168 }
169
170 #[tokio::test]
171 async fn test_cron_context_creation() {
172 let scheduled = Utc::now() - chrono::Duration::seconds(30);
173 let ctx = make_ctx(scheduled, false);
174
175 assert_eq!(ctx.cron_name, "test_cron");
176 assert!(!ctx.is_catch_up);
177 assert!(!ctx.auth.is_authenticated());
179 assert!(ctx.delay() >= chrono::Duration::zero());
181 }
182
183 #[tokio::test]
184 async fn test_cron_delay() {
185 let scheduled = Utc::now() - chrono::Duration::minutes(5);
186 let ctx = make_ctx(scheduled, false);
187
188 assert!(ctx.is_late());
189 assert!(ctx.delay() >= chrono::Duration::minutes(5));
190 }
191
192 #[tokio::test]
193 async fn cron_on_time_is_not_late() {
194 let ctx = make_ctx(Utc::now() + chrono::Duration::seconds(5), false);
196 assert!(!ctx.is_late());
197 }
198
199 #[tokio::test]
200 async fn cron_catch_up_flag_round_trips() {
201 let ctx = make_ctx(Utc::now() - chrono::Duration::minutes(30), true);
202 assert!(ctx.is_catch_up);
203 }
204
205 #[tokio::test]
206 async fn cron_trace_id_returns_run_id_as_string() {
207 let ctx = make_ctx(Utc::now(), false);
208 assert_eq!(ctx.trace_id(), ctx.run_id.to_string());
209 }
210
211 #[tokio::test]
212 async fn cron_with_auth_replaces_default() {
213 use std::collections::HashMap;
214 let uid = Uuid::new_v4();
215 let auth = AuthContext::authenticated(uid, vec!["admin".to_string()], HashMap::new());
216 let ctx = make_ctx(Utc::now(), false).with_auth(auth);
217 assert!(ctx.auth.is_authenticated());
218 assert!(ctx.auth.has_role("admin"));
219 }
220
221 #[tokio::test]
222 async fn cron_with_env_provider_overrides_real() {
223 let mut mock = MockEnvProvider::new();
224 mock.set("FORGE_CRON_KEY", "v");
225 let ctx = make_ctx(Utc::now(), false).with_env_provider(Arc::new(mock));
226 use crate::env::EnvAccess;
227 assert_eq!(ctx.env("FORGE_CRON_KEY"), Some("v".to_string()));
228 assert_eq!(ctx.env("FORGE_MISSING"), None);
229 }
230
231 #[tokio::test]
232 async fn cron_set_http_timeout_does_not_panic() {
233 let mut ctx = make_ctx(Utc::now(), false);
234 ctx.set_http_timeout(Some(Duration::from_millis(100)));
235 let _ = ctx.http();
236 ctx.set_http_timeout(None);
237 let _ = ctx.http();
238 }
239}