forge_core/cron/
context.rs1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use tracing::Span;
5use uuid::Uuid;
6
7use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
8use crate::function::AuthContext;
9
10pub struct CronContext {
12 pub run_id: Uuid,
14 pub cron_name: String,
16 pub scheduled_time: DateTime<Utc>,
18 pub execution_time: DateTime<Utc>,
20 pub timezone: String,
22 pub is_catch_up: bool,
24 pub auth: AuthContext,
26 db_pool: sqlx::PgPool,
28 http_client: reqwest::Client,
30 pub log: CronLog,
32 env_provider: Arc<dyn EnvProvider>,
34 span: Span,
36}
37
38impl CronContext {
39 pub fn new(
41 run_id: Uuid,
42 cron_name: String,
43 scheduled_time: DateTime<Utc>,
44 timezone: String,
45 is_catch_up: bool,
46 db_pool: sqlx::PgPool,
47 http_client: reqwest::Client,
48 ) -> Self {
49 Self {
50 run_id,
51 cron_name: cron_name.clone(),
52 scheduled_time,
53 execution_time: Utc::now(),
54 timezone,
55 is_catch_up,
56 auth: AuthContext::unauthenticated(),
57 db_pool,
58 http_client,
59 log: CronLog::new(cron_name),
60 env_provider: Arc::new(RealEnvProvider::new()),
61 span: Span::current(),
62 }
63 }
64
65 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
67 self.env_provider = provider;
68 self
69 }
70
71 pub fn db(&self) -> &sqlx::PgPool {
72 &self.db_pool
73 }
74
75 pub fn db_conn(&self) -> crate::function::DbConn<'_> {
77 crate::function::DbConn::Pool(&self.db_pool)
78 }
79
80 pub fn http(&self) -> &reqwest::Client {
81 &self.http_client
82 }
83
84 pub fn delay(&self) -> chrono::Duration {
86 self.execution_time - self.scheduled_time
87 }
88
89 pub fn is_late(&self) -> bool {
91 self.delay() > chrono::Duration::minutes(1)
92 }
93
94 pub fn with_auth(mut self, auth: AuthContext) -> Self {
96 self.auth = auth;
97 self
98 }
99
100 pub fn trace_id(&self) -> String {
104 self.run_id.to_string()
108 }
109
110 pub fn span(&self) -> &Span {
114 &self.span
115 }
116}
117
118impl EnvAccess for CronContext {
119 fn env_provider(&self) -> &dyn EnvProvider {
120 self.env_provider.as_ref()
121 }
122}
123
124#[derive(Clone)]
126pub struct CronLog {
127 cron_name: String,
128}
129
130impl CronLog {
131 pub fn new(cron_name: String) -> Self {
133 Self { cron_name }
134 }
135
136 pub fn info(&self, message: &str, data: serde_json::Value) {
138 tracing::info!(
139 cron_name = %self.cron_name,
140 data = %data,
141 "{}",
142 message
143 );
144 }
145
146 pub fn warn(&self, message: &str, data: serde_json::Value) {
148 tracing::warn!(
149 cron_name = %self.cron_name,
150 data = %data,
151 "{}",
152 message
153 );
154 }
155
156 pub fn error(&self, message: &str, data: serde_json::Value) {
158 tracing::error!(
159 cron_name = %self.cron_name,
160 data = %data,
161 "{}",
162 message
163 );
164 }
165
166 pub fn debug(&self, message: &str, data: serde_json::Value) {
168 tracing::debug!(
169 cron_name = %self.cron_name,
170 data = %data,
171 "{}",
172 message
173 );
174 }
175}
176
177#[cfg(test)]
178#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
179mod tests {
180 use super::*;
181
182 #[tokio::test]
183 async fn test_cron_context_creation() {
184 let pool = sqlx::postgres::PgPoolOptions::new()
185 .max_connections(1)
186 .connect_lazy("postgres://localhost/nonexistent")
187 .expect("Failed to create mock pool");
188
189 let run_id = Uuid::new_v4();
190 let scheduled = Utc::now() - chrono::Duration::seconds(30);
191
192 let ctx = CronContext::new(
193 run_id,
194 "test_cron".to_string(),
195 scheduled,
196 "UTC".to_string(),
197 false,
198 pool,
199 reqwest::Client::new(),
200 );
201
202 assert_eq!(ctx.run_id, run_id);
203 assert_eq!(ctx.cron_name, "test_cron");
204 assert!(!ctx.is_catch_up);
205 }
206
207 #[tokio::test]
208 async fn test_cron_delay() {
209 let pool = sqlx::postgres::PgPoolOptions::new()
210 .max_connections(1)
211 .connect_lazy("postgres://localhost/nonexistent")
212 .expect("Failed to create mock pool");
213
214 let scheduled = Utc::now() - chrono::Duration::minutes(5);
215
216 let ctx = CronContext::new(
217 Uuid::new_v4(),
218 "test_cron".to_string(),
219 scheduled,
220 "UTC".to_string(),
221 false,
222 pool,
223 reqwest::Client::new(),
224 );
225
226 assert!(ctx.is_late());
227 assert!(ctx.delay() >= chrono::Duration::minutes(5));
228 }
229
230 #[test]
231 fn test_cron_log() {
232 let log = CronLog::new("test_cron".to_string());
233 log.info("Test message", serde_json::json!({"key": "value"}));
234 }
235}