use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use tracing::Span;
use uuid::Uuid;
use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
use crate::function::{AuthContext, KvHandle};
use crate::http::CircuitBreakerClient;
#[non_exhaustive]
pub struct CronContext {
pub run_id: Uuid,
pub cron_name: String,
pub scheduled_time: DateTime<Utc>,
pub execution_time: DateTime<Utc>,
pub timezone: String,
pub is_catch_up: bool,
pub auth: AuthContext,
db_pool: sqlx::PgPool,
http_client: CircuitBreakerClient,
http_timeout: Option<Duration>,
env_provider: Arc<dyn EnvProvider>,
span: Span,
kv: Option<Arc<dyn KvHandle>>,
}
impl CronContext {
pub fn new(
run_id: Uuid,
cron_name: impl Into<String>,
scheduled_time: DateTime<Utc>,
timezone: String,
is_catch_up: bool,
db_pool: sqlx::PgPool,
http_client: CircuitBreakerClient,
) -> Self {
let cron_name = cron_name.into();
Self {
run_id,
cron_name,
scheduled_time,
execution_time: Utc::now(),
timezone,
is_catch_up,
auth: AuthContext::unauthenticated(),
db_pool,
http_client,
http_timeout: None,
env_provider: Arc::new(RealEnvProvider::new()),
span: Span::current(),
kv: None,
}
}
pub fn with_kv(mut self, kv: Arc<dyn KvHandle>) -> Self {
self.kv = Some(kv);
self
}
pub fn kv(&self) -> crate::error::Result<&dyn KvHandle> {
self.kv
.as_deref()
.ok_or_else(|| crate::error::ForgeError::internal("KV store not available"))
}
pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
self.env_provider = provider;
self
}
pub fn db(&self) -> crate::function::ForgeDb {
crate::function::ForgeDb::from_pool(&self.db_pool)
}
pub fn db_conn(&self) -> crate::function::DbConn<'_> {
crate::function::DbConn::Pool(self.db_pool.clone())
}
pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
Ok(crate::function::ForgeConn::Pool(
self.db_pool.acquire().await?,
))
}
pub fn http(&self) -> crate::http::HttpClient {
self.http_client.with_timeout(self.http_timeout)
}
pub fn raw_http(&self) -> &reqwest::Client {
self.http_client.inner()
}
pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
self.http_timeout = timeout;
}
pub fn delay(&self) -> chrono::Duration {
self.execution_time - self.scheduled_time
}
pub fn is_late(&self) -> bool {
self.delay() > chrono::Duration::minutes(1)
}
pub fn with_auth(mut self, auth: AuthContext) -> Self {
self.auth = auth;
self
}
pub fn trace_id(&self) -> String {
self.run_id.to_string()
}
pub fn span(&self) -> &Span {
&self.span
}
}
impl EnvAccess for CronContext {
fn env_provider(&self) -> &dyn EnvProvider {
self.env_provider.as_ref()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
mod tests {
use super::*;
use crate::env::MockEnvProvider;
fn make_ctx(scheduled: DateTime<Utc>, is_catch_up: bool) -> CronContext {
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect_lazy("postgres://localhost/nonexistent")
.expect("Failed to create mock pool");
CronContext::new(
Uuid::new_v4(),
"test_cron".to_string(),
scheduled,
"UTC".to_string(),
is_catch_up,
pool,
CircuitBreakerClient::with_defaults(reqwest::Client::new()),
)
}
#[tokio::test]
async fn test_cron_context_creation() {
let scheduled = Utc::now() - chrono::Duration::seconds(30);
let ctx = make_ctx(scheduled, false);
assert_eq!(ctx.cron_name, "test_cron");
assert!(!ctx.is_catch_up);
assert!(!ctx.auth.is_authenticated());
assert!(ctx.delay() >= chrono::Duration::zero());
}
#[tokio::test]
async fn test_cron_delay() {
let scheduled = Utc::now() - chrono::Duration::minutes(5);
let ctx = make_ctx(scheduled, false);
assert!(ctx.is_late());
assert!(ctx.delay() >= chrono::Duration::minutes(5));
}
#[tokio::test]
async fn cron_on_time_is_not_late() {
let ctx = make_ctx(Utc::now() + chrono::Duration::seconds(5), false);
assert!(!ctx.is_late());
}
#[tokio::test]
async fn cron_catch_up_flag_round_trips() {
let ctx = make_ctx(Utc::now() - chrono::Duration::minutes(30), true);
assert!(ctx.is_catch_up);
}
#[tokio::test]
async fn cron_trace_id_returns_run_id_as_string() {
let ctx = make_ctx(Utc::now(), false);
assert_eq!(ctx.trace_id(), ctx.run_id.to_string());
}
#[tokio::test]
async fn cron_with_auth_replaces_default() {
use std::collections::HashMap;
let uid = Uuid::new_v4();
let auth = AuthContext::authenticated(uid, vec!["admin".to_string()], HashMap::new());
let ctx = make_ctx(Utc::now(), false).with_auth(auth);
assert!(ctx.auth.is_authenticated());
assert!(ctx.auth.has_role("admin"));
}
#[tokio::test]
async fn cron_with_env_provider_overrides_real() {
let mut mock = MockEnvProvider::new();
mock.set("FORGE_CRON_KEY", "v");
let ctx = make_ctx(Utc::now(), false).with_env_provider(Arc::new(mock));
use crate::env::EnvAccess;
assert_eq!(ctx.env("FORGE_CRON_KEY"), Some("v".to_string()));
assert_eq!(ctx.env("FORGE_MISSING"), None);
}
#[tokio::test]
async fn cron_set_http_timeout_does_not_panic() {
let mut ctx = make_ctx(Utc::now(), false);
ctx.set_http_timeout(Some(Duration::from_millis(100)));
let _ = ctx.http();
ctx.set_http_timeout(None);
let _ = ctx.http();
}
}