forge_core/cron/
context.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use tracing::Span;
6use uuid::Uuid;
7
8use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
9use crate::function::AuthContext;
10use crate::http::CircuitBreakerClient;
11
12pub struct CronContext {
14 pub run_id: Uuid,
16 pub cron_name: String,
18 pub scheduled_time: DateTime<Utc>,
20 pub execution_time: DateTime<Utc>,
22 pub timezone: String,
24 pub is_catch_up: bool,
26 pub auth: AuthContext,
28 db_pool: sqlx::PgPool,
30 http_client: CircuitBreakerClient,
32 http_timeout: Option<Duration>,
35 pub log: CronLog,
37 env_provider: Arc<dyn EnvProvider>,
39 span: Span,
41}
42
43impl CronContext {
44 pub fn new(
46 run_id: Uuid,
47 cron_name: String,
48 scheduled_time: DateTime<Utc>,
49 timezone: String,
50 is_catch_up: bool,
51 db_pool: sqlx::PgPool,
52 http_client: CircuitBreakerClient,
53 ) -> Self {
54 Self {
55 run_id,
56 cron_name: cron_name.clone(),
57 scheduled_time,
58 execution_time: Utc::now(),
59 timezone,
60 is_catch_up,
61 auth: AuthContext::unauthenticated(),
62 db_pool,
63 http_client,
64 http_timeout: None,
65 log: CronLog::new(cron_name),
66 env_provider: Arc::new(RealEnvProvider::new()),
67 span: Span::current(),
68 }
69 }
70
71 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
73 self.env_provider = provider;
74 self
75 }
76
77 pub fn db(&self) -> crate::function::ForgeDb {
78 crate::function::ForgeDb::from_pool(&self.db_pool)
79 }
80
81 pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
83 Ok(crate::function::ForgeConn::Pool(
84 self.db_pool.acquire().await?,
85 ))
86 }
87
88 pub fn http(&self) -> crate::http::HttpClient {
89 self.http_client.with_timeout(self.http_timeout)
90 }
91
92 pub fn raw_http(&self) -> &reqwest::Client {
93 self.http_client.inner()
94 }
95
96 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
97 self.http_timeout = timeout;
98 }
99
100 pub fn delay(&self) -> chrono::Duration {
102 self.execution_time - self.scheduled_time
103 }
104
105 pub fn is_late(&self) -> bool {
107 self.delay() > chrono::Duration::minutes(1)
108 }
109
110 pub fn with_auth(mut self, auth: AuthContext) -> Self {
112 self.auth = auth;
113 self
114 }
115
116 pub fn trace_id(&self) -> String {
120 self.run_id.to_string()
124 }
125
126 pub fn span(&self) -> &Span {
130 &self.span
131 }
132}
133
134impl EnvAccess for CronContext {
135 fn env_provider(&self) -> &dyn EnvProvider {
136 self.env_provider.as_ref()
137 }
138}
139
140#[derive(Clone)]
142pub struct CronLog {
143 cron_name: String,
144}
145
146impl CronLog {
147 pub fn new(cron_name: String) -> Self {
149 Self { cron_name }
150 }
151
152 pub fn info(&self, message: &str, data: serde_json::Value) {
154 tracing::info!(
155 cron_name = %self.cron_name,
156 data = %data,
157 "{}",
158 message
159 );
160 }
161
162 pub fn warn(&self, message: &str, data: serde_json::Value) {
164 tracing::warn!(
165 cron_name = %self.cron_name,
166 data = %data,
167 "{}",
168 message
169 );
170 }
171
172 pub fn error(&self, message: &str, data: serde_json::Value) {
174 tracing::error!(
175 cron_name = %self.cron_name,
176 data = %data,
177 "{}",
178 message
179 );
180 }
181
182 pub fn debug(&self, message: &str, data: serde_json::Value) {
184 tracing::debug!(
185 cron_name = %self.cron_name,
186 data = %data,
187 "{}",
188 message
189 );
190 }
191}
192
193#[cfg(test)]
194#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
195mod tests {
196 use super::*;
197
198 #[tokio::test]
199 async fn test_cron_context_creation() {
200 let pool = sqlx::postgres::PgPoolOptions::new()
201 .max_connections(1)
202 .connect_lazy("postgres://localhost/nonexistent")
203 .expect("Failed to create mock pool");
204
205 let run_id = Uuid::new_v4();
206 let scheduled = Utc::now() - chrono::Duration::seconds(30);
207
208 let ctx = CronContext::new(
209 run_id,
210 "test_cron".to_string(),
211 scheduled,
212 "UTC".to_string(),
213 false,
214 pool,
215 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
216 );
217
218 assert_eq!(ctx.run_id, run_id);
219 assert_eq!(ctx.cron_name, "test_cron");
220 assert!(!ctx.is_catch_up);
221 }
222
223 #[tokio::test]
224 async fn test_cron_delay() {
225 let pool = sqlx::postgres::PgPoolOptions::new()
226 .max_connections(1)
227 .connect_lazy("postgres://localhost/nonexistent")
228 .expect("Failed to create mock pool");
229
230 let scheduled = Utc::now() - chrono::Duration::minutes(5);
231
232 let ctx = CronContext::new(
233 Uuid::new_v4(),
234 "test_cron".to_string(),
235 scheduled,
236 "UTC".to_string(),
237 false,
238 pool,
239 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
240 );
241
242 assert!(ctx.is_late());
243 assert!(ctx.delay() >= chrono::Duration::minutes(5));
244 }
245
246 #[test]
247 fn test_cron_log() {
248 let log = CronLog::new("test_cron".to_string());
249 log.info("Test message", serde_json::json!({"key": "value"}));
250 }
251}