forge_core/cron/
context.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use tracing::Span;
6use uuid::Uuid;
7
8use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
9use crate::function::AuthContext;
10use crate::http::CircuitBreakerClient;
11
12pub struct CronContext {
14 pub run_id: Uuid,
16 pub cron_name: String,
18 pub scheduled_time: DateTime<Utc>,
20 pub execution_time: DateTime<Utc>,
22 pub timezone: String,
24 pub is_catch_up: bool,
26 pub auth: AuthContext,
28 db_pool: sqlx::PgPool,
30 http_client: CircuitBreakerClient,
32 http_timeout: Option<Duration>,
35 pub log: CronLog,
37 env_provider: Arc<dyn EnvProvider>,
39 span: Span,
41}
42
43impl CronContext {
44 pub fn new(
46 run_id: Uuid,
47 cron_name: String,
48 scheduled_time: DateTime<Utc>,
49 timezone: String,
50 is_catch_up: bool,
51 db_pool: sqlx::PgPool,
52 http_client: CircuitBreakerClient,
53 ) -> Self {
54 Self {
55 run_id,
56 cron_name: cron_name.clone(),
57 scheduled_time,
58 execution_time: Utc::now(),
59 timezone,
60 is_catch_up,
61 auth: AuthContext::unauthenticated(),
62 db_pool,
63 http_client,
64 http_timeout: None,
65 log: CronLog::new(cron_name),
66 env_provider: Arc::new(RealEnvProvider::new()),
67 span: Span::current(),
68 }
69 }
70
71 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
73 self.env_provider = provider;
74 self
75 }
76
77 pub fn db(&self) -> crate::function::ForgeDb {
78 crate::function::ForgeDb::from_pool(&self.db_pool)
79 }
80
81 pub fn db_conn(&self) -> crate::function::DbConn<'_> {
83 crate::function::DbConn::Pool(self.db_pool.clone())
84 }
85
86 pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
88 Ok(crate::function::ForgeConn::Pool(
89 self.db_pool.acquire().await?,
90 ))
91 }
92
93 pub fn http(&self) -> crate::http::HttpClient {
94 self.http_client.with_timeout(self.http_timeout)
95 }
96
97 pub fn raw_http(&self) -> &reqwest::Client {
98 self.http_client.inner()
99 }
100
101 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
102 self.http_timeout = timeout;
103 }
104
105 pub fn delay(&self) -> chrono::Duration {
107 self.execution_time - self.scheduled_time
108 }
109
110 pub fn is_late(&self) -> bool {
112 self.delay() > chrono::Duration::minutes(1)
113 }
114
115 pub fn with_auth(mut self, auth: AuthContext) -> Self {
117 self.auth = auth;
118 self
119 }
120
121 pub fn trace_id(&self) -> String {
125 self.run_id.to_string()
129 }
130
131 pub fn span(&self) -> &Span {
135 &self.span
136 }
137}
138
139impl EnvAccess for CronContext {
140 fn env_provider(&self) -> &dyn EnvProvider {
141 self.env_provider.as_ref()
142 }
143}
144
145#[derive(Clone)]
147pub struct CronLog {
148 cron_name: String,
149}
150
151impl CronLog {
152 pub fn new(cron_name: String) -> Self {
154 Self { cron_name }
155 }
156
157 pub fn info(&self, message: &str, data: serde_json::Value) {
159 tracing::info!(
160 cron_name = %self.cron_name,
161 data = %data,
162 "{}",
163 message
164 );
165 }
166
167 pub fn warn(&self, message: &str, data: serde_json::Value) {
169 tracing::warn!(
170 cron_name = %self.cron_name,
171 data = %data,
172 "{}",
173 message
174 );
175 }
176
177 pub fn error(&self, message: &str, data: serde_json::Value) {
179 tracing::error!(
180 cron_name = %self.cron_name,
181 data = %data,
182 "{}",
183 message
184 );
185 }
186
187 pub fn debug(&self, message: &str, data: serde_json::Value) {
189 tracing::debug!(
190 cron_name = %self.cron_name,
191 data = %data,
192 "{}",
193 message
194 );
195 }
196}
197
198#[cfg(test)]
199#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
200mod tests {
201 use super::*;
202
203 #[tokio::test]
204 async fn test_cron_context_creation() {
205 let pool = sqlx::postgres::PgPoolOptions::new()
206 .max_connections(1)
207 .connect_lazy("postgres://localhost/nonexistent")
208 .expect("Failed to create mock pool");
209
210 let run_id = Uuid::new_v4();
211 let scheduled = Utc::now() - chrono::Duration::seconds(30);
212
213 let ctx = CronContext::new(
214 run_id,
215 "test_cron".to_string(),
216 scheduled,
217 "UTC".to_string(),
218 false,
219 pool,
220 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
221 );
222
223 assert_eq!(ctx.run_id, run_id);
224 assert_eq!(ctx.cron_name, "test_cron");
225 assert!(!ctx.is_catch_up);
226 }
227
228 #[tokio::test]
229 async fn test_cron_delay() {
230 let pool = sqlx::postgres::PgPoolOptions::new()
231 .max_connections(1)
232 .connect_lazy("postgres://localhost/nonexistent")
233 .expect("Failed to create mock pool");
234
235 let scheduled = Utc::now() - chrono::Duration::minutes(5);
236
237 let ctx = CronContext::new(
238 Uuid::new_v4(),
239 "test_cron".to_string(),
240 scheduled,
241 "UTC".to_string(),
242 false,
243 pool,
244 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
245 );
246
247 assert!(ctx.is_late());
248 assert!(ctx.delay() >= chrono::Duration::minutes(5));
249 }
250
251 #[test]
252 fn test_cron_log() {
253 let log = CronLog::new("test_cron".to_string());
254 log.info("Test message", serde_json::json!({"key": "value"}));
255 }
256}