forge_core/cron/
context.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use tracing::Span;
6use uuid::Uuid;
7
8use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
9use crate::function::AuthContext;
10use crate::http::CircuitBreakerClient;
11
12pub struct CronContext {
14 pub run_id: Uuid,
16 pub cron_name: String,
18 pub scheduled_time: DateTime<Utc>,
20 pub execution_time: DateTime<Utc>,
22 pub timezone: String,
24 pub is_catch_up: bool,
26 pub auth: AuthContext,
28 db_pool: sqlx::PgPool,
30 http_client: CircuitBreakerClient,
32 http_timeout: Option<Duration>,
35 pub log: CronLog,
37 env_provider: Arc<dyn EnvProvider>,
39 span: Span,
41}
42
43impl CronContext {
44 pub fn new(
46 run_id: Uuid,
47 cron_name: String,
48 scheduled_time: DateTime<Utc>,
49 timezone: String,
50 is_catch_up: bool,
51 db_pool: sqlx::PgPool,
52 http_client: CircuitBreakerClient,
53 ) -> Self {
54 Self {
55 run_id,
56 cron_name: cron_name.clone(),
57 scheduled_time,
58 execution_time: Utc::now(),
59 timezone,
60 is_catch_up,
61 auth: AuthContext::unauthenticated(),
62 db_pool,
63 http_client,
64 http_timeout: None,
65 log: CronLog::new(cron_name),
66 env_provider: Arc::new(RealEnvProvider::new()),
67 span: Span::current(),
68 }
69 }
70
71 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
73 self.env_provider = provider;
74 self
75 }
76
77 pub fn db(&self) -> crate::function::ForgeDb {
78 crate::function::ForgeDb::from_pool(&self.db_pool)
79 }
80
81 pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
83 Ok(crate::function::ForgeConn::Pool(
84 self.db_pool.acquire().await?,
85 ))
86 }
87
88 pub fn http(&self) -> crate::http::HttpClient {
89 self.http_client.with_timeout(self.http_timeout)
90 }
91
92 pub fn raw_http(&self) -> &reqwest::Client {
93 self.http_client.inner()
94 }
95
96 pub fn http_with_circuit_breaker(&self) -> crate::http::HttpClient {
97 self.http()
98 }
99
100 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
101 self.http_timeout = timeout;
102 }
103
104 pub fn delay(&self) -> chrono::Duration {
106 self.execution_time - self.scheduled_time
107 }
108
109 pub fn is_late(&self) -> bool {
111 self.delay() > chrono::Duration::minutes(1)
112 }
113
114 pub fn with_auth(mut self, auth: AuthContext) -> Self {
116 self.auth = auth;
117 self
118 }
119
120 pub fn trace_id(&self) -> String {
124 self.run_id.to_string()
128 }
129
130 pub fn span(&self) -> &Span {
134 &self.span
135 }
136}
137
138impl EnvAccess for CronContext {
139 fn env_provider(&self) -> &dyn EnvProvider {
140 self.env_provider.as_ref()
141 }
142}
143
144#[derive(Clone)]
146pub struct CronLog {
147 cron_name: String,
148}
149
150impl CronLog {
151 pub fn new(cron_name: String) -> Self {
153 Self { cron_name }
154 }
155
156 pub fn info(&self, message: &str, data: serde_json::Value) {
158 tracing::info!(
159 cron_name = %self.cron_name,
160 data = %data,
161 "{}",
162 message
163 );
164 }
165
166 pub fn warn(&self, message: &str, data: serde_json::Value) {
168 tracing::warn!(
169 cron_name = %self.cron_name,
170 data = %data,
171 "{}",
172 message
173 );
174 }
175
176 pub fn error(&self, message: &str, data: serde_json::Value) {
178 tracing::error!(
179 cron_name = %self.cron_name,
180 data = %data,
181 "{}",
182 message
183 );
184 }
185
186 pub fn debug(&self, message: &str, data: serde_json::Value) {
188 tracing::debug!(
189 cron_name = %self.cron_name,
190 data = %data,
191 "{}",
192 message
193 );
194 }
195}
196
197#[cfg(test)]
198#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
199mod tests {
200 use super::*;
201
202 #[tokio::test]
203 async fn test_cron_context_creation() {
204 let pool = sqlx::postgres::PgPoolOptions::new()
205 .max_connections(1)
206 .connect_lazy("postgres://localhost/nonexistent")
207 .expect("Failed to create mock pool");
208
209 let run_id = Uuid::new_v4();
210 let scheduled = Utc::now() - chrono::Duration::seconds(30);
211
212 let ctx = CronContext::new(
213 run_id,
214 "test_cron".to_string(),
215 scheduled,
216 "UTC".to_string(),
217 false,
218 pool,
219 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
220 );
221
222 assert_eq!(ctx.run_id, run_id);
223 assert_eq!(ctx.cron_name, "test_cron");
224 assert!(!ctx.is_catch_up);
225 }
226
227 #[tokio::test]
228 async fn test_cron_delay() {
229 let pool = sqlx::postgres::PgPoolOptions::new()
230 .max_connections(1)
231 .connect_lazy("postgres://localhost/nonexistent")
232 .expect("Failed to create mock pool");
233
234 let scheduled = Utc::now() - chrono::Duration::minutes(5);
235
236 let ctx = CronContext::new(
237 Uuid::new_v4(),
238 "test_cron".to_string(),
239 scheduled,
240 "UTC".to_string(),
241 false,
242 pool,
243 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
244 );
245
246 assert!(ctx.is_late());
247 assert!(ctx.delay() >= chrono::Duration::minutes(5));
248 }
249
250 #[test]
251 fn test_cron_log() {
252 let log = CronLog::new("test_cron".to_string());
253 log.info("Test message", serde_json::json!({"key": "value"}));
254 }
255}