forge_core/cron/
context.rs1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use tracing::Span;
5use uuid::Uuid;
6
7use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
8use crate::function::AuthContext;
9
10pub struct CronContext {
12 pub run_id: Uuid,
14 pub cron_name: String,
16 pub scheduled_time: DateTime<Utc>,
18 pub execution_time: DateTime<Utc>,
20 pub timezone: String,
22 pub is_catch_up: bool,
24 pub auth: AuthContext,
26 db_pool: sqlx::PgPool,
28 http_client: reqwest::Client,
30 pub log: CronLog,
32 env_provider: Arc<dyn EnvProvider>,
34 span: Span,
36}
37
38impl CronContext {
39 pub fn new(
41 run_id: Uuid,
42 cron_name: String,
43 scheduled_time: DateTime<Utc>,
44 timezone: String,
45 is_catch_up: bool,
46 db_pool: sqlx::PgPool,
47 http_client: reqwest::Client,
48 ) -> Self {
49 Self {
50 run_id,
51 cron_name: cron_name.clone(),
52 scheduled_time,
53 execution_time: Utc::now(),
54 timezone,
55 is_catch_up,
56 auth: AuthContext::unauthenticated(),
57 db_pool,
58 http_client,
59 log: CronLog::new(cron_name),
60 env_provider: Arc::new(RealEnvProvider::new()),
61 span: Span::current(),
62 }
63 }
64
65 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
67 self.env_provider = provider;
68 self
69 }
70
71 pub fn db(&self) -> &sqlx::PgPool {
72 &self.db_pool
73 }
74
75 pub fn db_conn(&self) -> crate::function::DbConn<'_> {
77 crate::function::DbConn::Pool(&self.db_pool)
78 }
79
80 pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
82 Ok(crate::function::ForgeConn::Pool(
83 self.db_pool.acquire().await?,
84 ))
85 }
86
87 pub fn http(&self) -> &reqwest::Client {
88 &self.http_client
89 }
90
91 pub fn delay(&self) -> chrono::Duration {
93 self.execution_time - self.scheduled_time
94 }
95
96 pub fn is_late(&self) -> bool {
98 self.delay() > chrono::Duration::minutes(1)
99 }
100
101 pub fn with_auth(mut self, auth: AuthContext) -> Self {
103 self.auth = auth;
104 self
105 }
106
107 pub fn trace_id(&self) -> String {
111 self.run_id.to_string()
115 }
116
117 pub fn span(&self) -> &Span {
121 &self.span
122 }
123}
124
125impl EnvAccess for CronContext {
126 fn env_provider(&self) -> &dyn EnvProvider {
127 self.env_provider.as_ref()
128 }
129}
130
131#[derive(Clone)]
133pub struct CronLog {
134 cron_name: String,
135}
136
137impl CronLog {
138 pub fn new(cron_name: String) -> Self {
140 Self { cron_name }
141 }
142
143 pub fn info(&self, message: &str, data: serde_json::Value) {
145 tracing::info!(
146 cron_name = %self.cron_name,
147 data = %data,
148 "{}",
149 message
150 );
151 }
152
153 pub fn warn(&self, message: &str, data: serde_json::Value) {
155 tracing::warn!(
156 cron_name = %self.cron_name,
157 data = %data,
158 "{}",
159 message
160 );
161 }
162
163 pub fn error(&self, message: &str, data: serde_json::Value) {
165 tracing::error!(
166 cron_name = %self.cron_name,
167 data = %data,
168 "{}",
169 message
170 );
171 }
172
173 pub fn debug(&self, message: &str, data: serde_json::Value) {
175 tracing::debug!(
176 cron_name = %self.cron_name,
177 data = %data,
178 "{}",
179 message
180 );
181 }
182}
183
184#[cfg(test)]
185#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
186mod tests {
187 use super::*;
188
189 #[tokio::test]
190 async fn test_cron_context_creation() {
191 let pool = sqlx::postgres::PgPoolOptions::new()
192 .max_connections(1)
193 .connect_lazy("postgres://localhost/nonexistent")
194 .expect("Failed to create mock pool");
195
196 let run_id = Uuid::new_v4();
197 let scheduled = Utc::now() - chrono::Duration::seconds(30);
198
199 let ctx = CronContext::new(
200 run_id,
201 "test_cron".to_string(),
202 scheduled,
203 "UTC".to_string(),
204 false,
205 pool,
206 reqwest::Client::new(),
207 );
208
209 assert_eq!(ctx.run_id, run_id);
210 assert_eq!(ctx.cron_name, "test_cron");
211 assert!(!ctx.is_catch_up);
212 }
213
214 #[tokio::test]
215 async fn test_cron_delay() {
216 let pool = sqlx::postgres::PgPoolOptions::new()
217 .max_connections(1)
218 .connect_lazy("postgres://localhost/nonexistent")
219 .expect("Failed to create mock pool");
220
221 let scheduled = Utc::now() - chrono::Duration::minutes(5);
222
223 let ctx = CronContext::new(
224 Uuid::new_v4(),
225 "test_cron".to_string(),
226 scheduled,
227 "UTC".to_string(),
228 false,
229 pool,
230 reqwest::Client::new(),
231 );
232
233 assert!(ctx.is_late());
234 assert!(ctx.delay() >= chrono::Duration::minutes(5));
235 }
236
237 #[test]
238 fn test_cron_log() {
239 let log = CronLog::new("test_cron".to_string());
240 log.info("Test message", serde_json::json!({"key": "value"}));
241 }
242}