harn-vm 0.8.3

Async bytecode virtual machine for the Harn programming language
Documentation
use std::sync::Arc;

use async_trait::async_trait;
use serde_json::json;
use time::{Duration, OffsetDateTime};
use tokio::sync::Mutex;

use crate::connectors::cron::scheduler::CronSchedule;
use crate::connectors::cron::state::{CronStateStore, PersistedCronState};
use crate::connectors::cron::{
    looks_like_utc_offset, CatchupMode, CronConnector, CronEventSink, EventLogCronEventSink,
};
use crate::connectors::testkit::MemorySecretProvider;
use crate::connectors::{Connector, ConnectorCtx, TriggerBinding};
use crate::event_log::{AnyEventLog, EventLog, FileEventLog, MemoryEventLog, Topic};
use crate::triggers::TriggerEvent;
use crate::{InboxIndex, MetricsRegistry, ProviderId, RateLimiterFactory, TriggerKind};

use super::CRON_TICK_TOPIC;

struct RecordingSink {
    events: Mutex<Vec<TriggerEvent>>,
}

impl RecordingSink {
    fn new() -> Arc<Self> {
        Arc::new(Self {
            events: Mutex::new(Vec::new()),
        })
    }

    async fn take(&self) -> Vec<TriggerEvent> {
        self.events.lock().await.clone()
    }
}

#[async_trait]
impl CronEventSink for RecordingSink {
    async fn emit(
        &self,
        _binding_id: &str,
        _retention: std::time::Duration,
        event: TriggerEvent,
    ) -> Result<(), crate::connectors::ConnectorError> {
        self.events.lock().await.push(event);
        Ok(())
    }
}

fn binding(id: &str, schedule: &str, timezone: &str, catchup_mode: CatchupMode) -> TriggerBinding {
    TriggerBinding {
        provider: ProviderId::from("cron"),
        kind: TriggerKind::from("cron"),
        binding_id: id.to_string(),
        dedupe_key: None,
        dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
        config: json!({
            "schedule": schedule,
            "timezone": timezone,
            "catchup_mode": catchup_mode,
            "retention_days": crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
        }),
    }
}

async fn ctx(event_log: Arc<AnyEventLog>) -> ConnectorCtx {
    let metrics = Arc::new(MetricsRegistry::default());
    ConnectorCtx {
        inbox: Arc::new(
            InboxIndex::new(event_log.clone(), metrics.clone())
                .await
                .unwrap(),
        ),
        event_log,
        secrets: Arc::new(MemorySecretProvider::new("cron")),
        metrics,
        rate_limiter: Arc::new(RateLimiterFactory::default()),
    }
}

fn parse(ts: &str) -> OffsetDateTime {
    OffsetDateTime::parse(ts, &time::format_description::well_known::Rfc3339).unwrap()
}

#[test]
fn midnight_schedule_tracks_new_york_local_midnight_in_dst_and_standard_time() {
    let schedule = CronSchedule::parse("0 0 * * *", "America/New_York".parse().unwrap()).unwrap();

    let january = schedule
        .next_tick_after(parse("2026-01-15T04:59:00Z"))
        .unwrap();
    let july = schedule
        .next_tick_after(parse("2026-07-15T03:59:00Z"))
        .unwrap();

    assert_eq!(january, parse("2026-01-15T05:00:00Z"));
    assert_eq!(july, parse("2026-07-15T04:00:00Z"));
}

#[test]
fn fallback_hour_fires_only_once() {
    let schedule = CronSchedule::parse("0 1 * * *", "America/New_York".parse().unwrap()).unwrap();
    let due = schedule
        .due_ticks_between(
            Some(parse("2026-11-01T04:59:00Z")),
            parse("2026-11-01T07:01:00Z"),
        )
        .unwrap();

    assert_eq!(due, vec![parse("2026-11-01T05:00:00Z")]);
}

#[test]
fn spring_forward_gap_does_not_fire_missing_hour() {
    let schedule = CronSchedule::parse("0 2 * * *", "America/New_York".parse().unwrap()).unwrap();
    let due = schedule
        .due_ticks_between(
            Some(parse("2026-03-08T06:59:00Z")),
            parse("2026-03-08T08:01:00Z"),
        )
        .unwrap();

    assert!(due.is_empty());
}

#[tokio::test(flavor = "current_thread")]
async fn durable_state_round_trips_through_event_log() {
    let tmp = tempfile::tempdir().unwrap();
    let first = Arc::new(AnyEventLog::File(
        FileEventLog::open(tmp.path().to_path_buf(), 32).unwrap(),
    ));
    let store = CronStateStore::new(first);
    store
        .persist(PersistedCronState {
            trigger_id: "nightly".to_string(),
            last_fired_at: parse("2026-04-19T00:10:00Z"),
        })
        .await
        .unwrap();

    let reopened = Arc::new(AnyEventLog::File(
        FileEventLog::open(tmp.path().to_path_buf(), 32).unwrap(),
    ));
    let restored = CronStateStore::new(reopened)
        .load("nightly")
        .await
        .unwrap()
        .unwrap();

    assert_eq!(restored.last_fired_at, parse("2026-04-19T00:10:00Z"));
}

#[tokio::test(flavor = "current_thread")]
async fn catchup_skip_drops_missed_ticks() {
    let clock = crate::connectors::test_util::MockClock::new(parse("2026-04-19T00:10:00Z"));
    let sink = RecordingSink::new();
    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
    let store = CronStateStore::new(log.clone());
    store
        .persist(PersistedCronState {
            trigger_id: "hourly".to_string(),
            last_fired_at: parse("2026-04-19T00:00:00Z"),
        })
        .await
        .unwrap();

    let mut connector = CronConnector::with_clock_and_sink(clock.clone(), sink.clone());
    connector.init(ctx(log).await).await.unwrap();
    connector
        .activate(&[binding("hourly", "* * * * *", "UTC", CatchupMode::Skip)])
        .await
        .unwrap();
    tokio::task::yield_now().await;

    assert!(sink.take().await.is_empty());
}

#[tokio::test(flavor = "current_thread")]
async fn catchup_all_replays_every_missed_tick_in_order() {
    let clock = crate::connectors::test_util::MockClock::new(parse("2026-04-19T00:10:00Z"));
    let sink = RecordingSink::new();
    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
    let store = CronStateStore::new(log.clone());
    store
        .persist(PersistedCronState {
            trigger_id: "hourly".to_string(),
            last_fired_at: parse("2026-04-19T00:00:00Z"),
        })
        .await
        .unwrap();

    let mut connector = CronConnector::with_clock_and_sink(clock.clone(), sink.clone());
    connector.init(ctx(log).await).await.unwrap();
    connector
        .activate(&[binding("hourly", "* * * * *", "UTC", CatchupMode::All)])
        .await
        .unwrap();
    tokio::task::yield_now().await;

    let events = sink.take().await;
    assert_eq!(events.len(), 10);
    assert_eq!(events[0].occurred_at, Some(parse("2026-04-19T00:01:00Z")));
    assert_eq!(events[9].occurred_at, Some(parse("2026-04-19T00:10:00Z")));
}

#[tokio::test(flavor = "current_thread")]
async fn catchup_latest_replays_only_the_most_recent_tick() {
    let clock = crate::connectors::test_util::MockClock::new(parse("2026-04-19T00:10:00Z"));
    let sink = RecordingSink::new();
    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
    let store = CronStateStore::new(log.clone());
    store
        .persist(PersistedCronState {
            trigger_id: "hourly".to_string(),
            last_fired_at: parse("2026-04-19T00:00:00Z"),
        })
        .await
        .unwrap();

    let mut connector = CronConnector::with_clock_and_sink(clock.clone(), sink.clone());
    connector.init(ctx(log).await).await.unwrap();
    connector
        .activate(&[binding("hourly", "* * * * *", "UTC", CatchupMode::Latest)])
        .await
        .unwrap();
    tokio::task::yield_now().await;

    let events = sink.take().await;
    assert_eq!(events.len(), 1);
    assert_eq!(events[0].occurred_at, Some(parse("2026-04-19T00:10:00Z")));
}

#[tokio::test(flavor = "current_thread")]
async fn restart_uses_durable_state_to_avoid_duplicate_tick() {
    let tmp = tempfile::tempdir().unwrap();
    let path = tmp.path().to_path_buf();
    let sink_one = RecordingSink::new();
    let clock_one = crate::connectors::test_util::MockClock::new(parse("2026-04-19T00:00:30Z"));
    let log_one = Arc::new(AnyEventLog::File(
        FileEventLog::open(path.clone(), 32).unwrap(),
    ));

    let mut first = CronConnector::with_clock_and_sink(clock_one.clone(), sink_one.clone());
    first.init(ctx(log_one).await).await.unwrap();
    first
        .activate(&[binding("hourly", "* * * * *", "UTC", CatchupMode::Skip)])
        .await
        .unwrap();
    clock_one.advance(Duration::seconds(30)).await;
    tokio::task::yield_now().await;
    assert_eq!(sink_one.take().await.len(), 1);
    drop(first);

    let sink_two = RecordingSink::new();
    let clock_two = crate::connectors::test_util::MockClock::new(parse("2026-04-19T00:01:30Z"));
    let log_two = Arc::new(AnyEventLog::File(FileEventLog::open(path, 32).unwrap()));
    let mut second = CronConnector::with_clock_and_sink(clock_two.clone(), sink_two.clone());
    second.init(ctx(log_two).await).await.unwrap();
    second
        .activate(&[binding("hourly", "* * * * *", "UTC", CatchupMode::Skip)])
        .await
        .unwrap();
    tokio::task::yield_now().await;
    assert!(sink_two.take().await.is_empty());
    clock_two.advance(Duration::seconds(30)).await;
    tokio::task::yield_now().await;

    let events = sink_two.take().await;
    assert_eq!(events.len(), 1);
    assert_eq!(events[0].occurred_at, Some(parse("2026-04-19T00:02:00Z")));
}

#[tokio::test(flavor = "current_thread")]
async fn default_sink_writes_trigger_events_to_event_log() {
    let clock = crate::connectors::test_util::MockClock::new(parse("2026-04-19T00:00:30Z"));
    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
    let mut connector = CronConnector::with_clock(clock.clone());
    connector.init(ctx(log.clone()).await).await.unwrap();
    connector
        .activate(&[binding("hourly", "* * * * *", "UTC", CatchupMode::Skip)])
        .await
        .unwrap();
    clock.advance(Duration::seconds(30)).await;
    tokio::task::yield_now().await;

    let topic = Topic::new(CRON_TICK_TOPIC).unwrap();
    let events = log.read_range(&topic, None, usize::MAX).await.unwrap();
    assert_eq!(events.len(), 1);
}

#[tokio::test(flavor = "current_thread")]
async fn default_sink_does_not_double_claim_dedupe() {
    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
    let metrics = Arc::new(MetricsRegistry::default());
    let inbox = Arc::new(InboxIndex::new(log.clone(), metrics.clone()).await.unwrap());
    let sink = EventLogCronEventSink::new(log.clone(), inbox);
    let trigger =
        super::CronTrigger::from_binding(&binding("hourly", "* * * * *", "UTC", CatchupMode::Skip))
            .unwrap();

    sink.emit(
        "hourly",
        trigger.dedupe_retention,
        trigger.to_event(parse("2026-04-19T00:01:00Z"), false),
    )
    .await
    .unwrap();

    let inbox_topic = Topic::new(crate::triggers::TRIGGER_INBOX_CLAIMS_TOPIC).unwrap();
    let claims = log
        .read_range(&inbox_topic, None, usize::MAX)
        .await
        .unwrap();
    assert_eq!(
        claims.len(),
        1,
        "cron post-processing should not re-claim inbox dedupe"
    );
}

#[test]
fn utc_offset_detection_rejects_offset_style_timezones() {
    assert!(looks_like_utc_offset("+02:00"));
    assert!(looks_like_utc_offset("UTC-5"));
    assert!(!looks_like_utc_offset("UTC"));
    assert!(!looks_like_utc_offset("America/New_York"));
}

#[tokio::test(flavor = "current_thread")]
async fn inbox_dedupe_survives_restart_without_cron_state() {
    let tmp = tempfile::tempdir().unwrap();
    let path = tmp.path().to_path_buf();
    let tick_at = parse("2026-04-19T00:02:00Z");

    let log_one = Arc::new(AnyEventLog::File(
        FileEventLog::open(path.clone(), 32).unwrap(),
    ));
    let metrics_one = Arc::new(MetricsRegistry::default());
    let inbox_one = Arc::new(
        InboxIndex::new(log_one.clone(), metrics_one.clone())
            .await
            .unwrap(),
    );
    let sink_one = EventLogCronEventSink::new(log_one.clone(), inbox_one.clone());
    let event = binding("hourly", "* * * * *", "UTC", CatchupMode::Latest);
    let trigger = super::CronTrigger::from_binding(&event).unwrap();
    sink_one
        .emit(
            "hourly",
            trigger.dedupe_retention,
            trigger.to_event(tick_at, false),
        )
        .await
        .unwrap();
    drop(inbox_one);
    drop(log_one);

    let log_two = Arc::new(AnyEventLog::File(FileEventLog::open(path, 32).unwrap()));
    let metrics_two = Arc::new(MetricsRegistry::default());
    let sink_two = EventLogCronEventSink::new(
        log_two.clone(),
        Arc::new(
            InboxIndex::new(log_two.clone(), metrics_two.clone())
                .await
                .unwrap(),
        ),
    );
    sink_two
        .emit(
            "hourly",
            trigger.dedupe_retention,
            trigger.to_event(tick_at, false),
        )
        .await
        .unwrap();

    let topic = Topic::new(CRON_TICK_TOPIC).unwrap();
    let events = log_two.read_range(&topic, None, usize::MAX).await.unwrap();
    assert_eq!(events.len(), 1);
    let snapshot = metrics_two.snapshot();
    assert_eq!(snapshot.inbox_duplicates_rejected, 1);
    assert_eq!(snapshot.inbox_durable_hits, 1);
}