Skip to main content

autumn_harvest/
lib.rs

1//! Durable workflow orchestration engine core.
2
3/// Embedded migrations for the harvest engine schema.
4///
5/// Downstream crates (such as `autumn-harvest-plugin`) should consume this
6/// const instead of invoking `diesel_migrations::embed_migrations!()` against
7/// this crate's path. Proc-macro path arguments are resolved at the *consumer's*
8/// compile-time location, which doesn't survive `cargo publish` because the
9/// upstream crate's directory layout isn't available in the downstream's
10/// packaged tarball. Centralising the macro invocation here keeps the path
11/// resolution local to this crate, where the `migrations/` directory always
12/// ships alongside.
13#[cfg(feature = "db")]
14#[macro_export]
15macro_rules! cfg_db {
16    ($($item:item)*) => {
17        $($item)*
18    };
19}
20
21#[cfg(not(feature = "db"))]
22#[macro_export]
23macro_rules! cfg_db {
24    ($($item:item)*) => {};
25}
26
27#[cfg(feature = "db")]
28pub const MIGRATIONS: diesel_migrations::EmbeddedMigrations =
29    diesel_migrations::embed_migrations!();
30
31// Consume the env var emitted by build.rs so that Cargo recompiles this
32// crate (and re-runs embed_migrations!()) whenever a migration is added or
33// removed — even when only the migrations/ directory changed and no Rust
34// source file was edited.
35#[cfg(feature = "db")]
36const _MIGRATIONS_LIST: &str = env!("HARVEST_MIGRATIONS_LIST");
37
38/// Admission gate primitive for incident-response operators (issue #377).
39pub mod admission_gate;
40/// History analyzer and linter.
41pub mod analyzer;
42/// Audit trail for management API mutations (issue #158).
43#[cfg(feature = "db")]
44pub mod audit;
45/// Batch operations for fleet-wide workflow cancel/terminate/signal (issue #102).
46pub mod batch;
47/// Batch workflow start types: caps and per-item request/result structs (issue #357).
48pub mod batch_start;
49/// Worker build-id routing for safe rolling deploys (issue #171).
50#[cfg(feature = "db")]
51pub mod build_routing;
52pub mod builder;
53pub mod cache;
54/// Calendar-aware schedule filtering: named exclusion sets, skip policies, and
55/// schedule preview generation (issue #337).
56pub mod calendar;
57/// Per-activity circuit breaker that fast-fails dispatch during downstream
58/// outages (issue #369).
59pub mod circuit_breaker;
60pub mod completion_trigger;
61/// Per-key concurrency limits for tenant fair-share scheduling (issue #247).
62pub mod concurrency;
63pub mod context;
64pub mod critical_path;
65pub mod dag;
66/// Export format types for Directed Acyclic Graphs (DAGs) representing workflows.
67pub mod dag_export;
68pub mod dag_linter;
69#[cfg(feature = "testing")]
70pub mod dag_profiler;
71#[cfg(any(test, feature = "testing"))]
72pub mod dag_simulator;
73/// Deterministic workflow guardrails: static source-level check for replay-breaking patterns.
74pub mod det_check;
75pub mod diagnostic;
76pub mod eligibility;
77pub mod error;
78pub mod event;
79#[cfg(feature = "db")]
80#[doc(hidden)]
81pub mod execution;
82pub mod executor;
83#[cfg(feature = "db")]
84pub mod external_task;
85pub mod failure;
86/// Deterministic workflow guardrail rule catalog (issue #173).
87pub mod guardrail;
88#[cfg(feature = "db")]
89pub mod handle;
90#[cfg(feature = "db")]
91pub mod handle_typed;
92pub mod history_export;
93pub mod info;
94/// `metrics` crate adapter for [`telemetry::MetricsRecorder`].
95///
96/// Bridges every `record_*` call to the global [`metrics`] registry so
97/// applications using `metrics-exporter-prometheus` or another compatible
98/// backend get harvest engine metrics without any extra glue code.
99///
100/// Enabled by the `metrics-rs` cargo feature.
101#[cfg(feature = "metrics-rs")]
102pub mod metrics_rs_adapter;
103pub mod payload_codec;
104pub mod poison_pill;
105pub mod policy;
106pub mod pool;
107pub mod prelude;
108/// Types and definitions for querying workflow state and metadata.
109pub mod query;
110pub mod replay;
111#[cfg(feature = "db")]
112pub mod reset;
113pub mod retention;
114pub mod saga;
115pub mod shard;
116pub mod simulator;
117/// OpenTelemetry integration: trace-context propagation and metrics.
118pub mod telemetry;
119#[cfg(any(test, feature = "testing"))]
120pub mod test_generator;
121/// Replay test harness for verifying workflow determinism pre-deploy.
122#[cfg(any(test, feature = "testing"))]
123pub mod testing;
124pub mod types;
125pub mod update;
126#[cfg(feature = "db")]
127pub mod version_gate_retirement;
128#[cfg(feature = "db")]
129pub mod version_usage;
130
131#[cfg(feature = "db")]
132#[doc(hidden)]
133pub mod dlq;
134#[cfg(feature = "db")]
135#[doc(hidden)]
136pub mod heartbeat;
137#[cfg(feature = "db")]
138#[doc(hidden)]
139pub mod models;
140#[cfg(feature = "db")]
141#[doc(hidden)]
142pub mod notify;
143#[cfg(feature = "db")]
144#[doc(hidden)]
145pub mod queue;
146#[cfg(feature = "db")]
147pub mod schedule_decision;
148#[cfg(feature = "db")]
149pub mod scheduler;
150#[cfg(feature = "db")]
151#[allow(clippy::wildcard_imports)]
152#[doc(hidden)]
153pub mod schema;
154#[cfg(feature = "db")]
155#[doc(hidden)]
156pub mod signal;
157#[cfg(feature = "db")]
158#[doc(hidden)]
159pub mod store;
160#[cfg(feature = "db")]
161#[doc(hidden)]
162pub mod timeout;
163#[cfg(feature = "db")]
164#[doc(hidden)]
165pub mod worker;
166#[cfg(feature = "db")]
167pub mod workers;
168
169pub use admission_gate::{
170    AdmissionGate, AdmissionGateCache, AdmissionGateId, AdmissionGateView, GateScope,
171    MAX_ACTIVE_GATES, check_admission,
172};
173pub use analyzer::{
174    AnalyzerRule, AnalyzerWarning, ExcessiveRetriesRule, HistoryAnalyzer, LargePayloadRule,
175    SuspiciousTimerRule,
176};
177pub use builder::{
178    BuiltHarvest, HarvestBuilder, HarvestBuilderError, StickyRoutingConfig, WorkerConfig,
179};
180pub use cache::{CachedWorkflowState, WorkflowCache};
181#[cfg(feature = "db")]
182pub use calendar::{
183    BackfillSlot, create_calendar, delete_calendar, get_calendar, list_calendars,
184    load_exclusions_for_calendar, plan_backfill_with_calendar, preview_schedule_firings,
185    replace_calendar_exclusions,
186};
187pub use calendar::{
188    Calendar, ScheduleFirePreview, apply_skip_policy, calendar_excludes_weekends, is_excluded_date,
189};
190pub use completion_trigger::{CompletionTrigger, InputMapping, TerminalState};
191pub use context::{
192    ActivityContext, DEFAULT_HISTORY_CONTINUE_AS_NEW_THRESHOLD, WorkflowCommand, WorkflowContext,
193    WorkflowHistoryPolicy,
194};
195pub use critical_path::{CriticalPathAnalyzer, CriticalPathResult};
196pub use dag::{
197    DagBuildError, DagBuilder, DagCondition, DagDefinition, DagDispatchDecision, DagMapTaskRef,
198    DagTask, DagTaskRef,
199};
200pub use dag_export::{export_dot, export_mermaid};
201pub use dag_linter::{
202    DagLinter, DagRule, DagWarning, ExcessiveParallelismRule, MissingRetryPolicyRule,
203    MissingTimeoutRule,
204};
205#[cfg(feature = "testing")]
206pub use dag_profiler::{DagProfile, DagProfiler, ProfilerEvent, ProfilerEventKind};
207#[cfg(any(test, feature = "testing"))]
208pub use dag_simulator::{DagSimulator, DagSimulatorResult};
209pub use det_check::{
210    DetCheckReport, DetFinding, DetLocation, DetSeverity, DetSuppression, check_dir, check_file,
211    check_source,
212};
213pub use diagnostic::{DiagnosticReport, SimulatorResultExt};
214pub use error::{HarvestError, HarvestResult, TimeoutType};
215pub use event::{SideEffectKind, WorkflowEvent};
216#[cfg(feature = "db")]
217pub use execution::{
218    CancelledWorkflowExecution, PausedWorkflowExecution, ResumedWorkflowExecution,
219    SignalWithStartOutcome, SignalWithStartParams, StartWorkflowParams, StartedWorkflowExecution,
220    UpdateWithStartOutcome, UpdateWithStartParams, auto_resume_expired_pauses,
221    cancel_workflow_execution, pause_workflow_execution, resume_workflow_execution,
222    signal_with_start_workflow_execution, start_or_load_workflow_execution,
223    terminate_workflow_execution, update_with_start_workflow_execution,
224};
225pub use executor::{WorkflowOutcome, run_workflow};
226pub use guardrail::{
227    GuardrailFinding, GuardrailSuppression, GuardrailSuppressionError, RuleCategory, RuleEntry,
228    Severity, catalog as guardrail_catalog, rule_by_id as guardrail_rule_by_id,
229};
230#[cfg(feature = "db")]
231pub use handle::{
232    StartedWorkflowHandle, WorkflowHandle, WorkflowHandleClient, WorkflowResult,
233    WorkflowResultState, start_or_load_workflow_execution_with_handle,
234};
235#[cfg(feature = "db")]
236pub use handle_typed::{
237    TypedSignalWithStartOptions, TypedStartOptions, TypedUpdateWithStartOptions,
238    TypedWorkflowHandle, TypedWorkflowResult,
239};
240pub use history_export::{
241    DEFAULT_HISTORY_EXPORT_MAX_BYTES, HISTORY_EXPORT_SCHEMA, HISTORY_EXPORT_VERSION,
242    HistoryExportDocument, HistoryExportError, HistoryExportRequest, HistoryExportSizeLimit,
243    HistoryExportStatus, HistoryPayloadPolicy, export_history, export_mermaid_sequence,
244};
245pub use info::{
246    ActivityHandlerFn, ActivityInfo, DagInfo, QueryHandlerFn, QueryHandlerInfo, UpdateHandlerFn,
247    UpdateHandlerInfo, UpdateValidatorFn, WorkflowHandlerFn, WorkflowInfo,
248};
249pub use payload_codec::{CodecError, IdentityCodec, PayloadCodec, PayloadCodecs};
250pub use policy::validate_schedule;
251pub use policy::{
252    CatchupPolicy, MapFailurePolicy, OverlapPolicy, RetryPolicy, Schedule, SkipPolicy, TaskStatus,
253    TriggerRule, WorkflowSchedule,
254};
255pub use pool::{HarvestPoolConfig, compute_pool_sizes};
256pub use query::QueryRegistry;
257pub use replay::{HistoryMatch, HistoryMatcher, SignalOrTimerMatch};
258#[cfg(feature = "db")]
259pub use reset::{
260    ResetInvalidPoint, ResetPlan, ResetResult, ResetSignalReapplyPolicy, ResetUnresolvedSideEffect,
261    WorkflowResetError, WorkflowResetRequest, preview_workflow_reset, reset_workflow_execution,
262    validate_reset_point,
263};
264pub use retention::{ArchiverFuture, HistoryArchiver, RetentionConfig};
265#[cfg(feature = "db")]
266pub use retention::{RetentionMonitor, RetentionRuntime, RetentionStatus, RetentionTickResult};
267pub use saga::Saga;
268#[cfg(feature = "db")]
269pub use schedule_decision::record_decision_graceful;
270#[cfg(feature = "db")]
271pub use scheduler::{
272    DagCatalog, RegisteredDag, SchedulerMonitor, SchedulerRuntime, compile_dag_catalog,
273    register_schedules, register_workflow_schedules, tick_once, trigger_unified_dag,
274};
275pub use shard::ShardRouter;
276#[cfg(feature = "db")]
277pub use shard::ShardedDbPool;
278pub use simulator::{SimulatorResult, WorkflowSimulator};
279#[cfg(feature = "db")]
280pub use store::AwaitMode;
281pub use telemetry::{
282    ActivityStatus, MetricsRecorder, NoOpMetrics, NoOpPropagator, TelemetryConfig,
283    TelemetryConfigBuilder, TraceContextCarrier, TraceContextPropagator, WorkflowStatus,
284};
285#[cfg(any(test, feature = "testing"))]
286pub use test_generator::TestHarnessGenerator;
287#[cfg(feature = "testing")]
288pub use testing::{
289    BatchReplayReport, CiReport, FailOnMode, FixtureResult, FixtureStatus, HarnessErrorKind,
290    ReplayVerifier, ReportFormat, TestRunOutcome, WorkflowTestEnv,
291};
292#[cfg(any(test, feature = "testing"))]
293pub use testing::{
294    HistorySnapshot, NonDeterminismKind, ReplayReport, ReplayStatus, WorkflowReplayer,
295};
296pub use types::{
297    ActivityExecId, BuildId, DeploymentName, ExecutionId, ExternalActivityToken, ExternalCancelId,
298    ExternalSignalId, ParentClosePolicy, Priority, ShardId, TimerId, UpdateId, WorkerId,
299    WorkflowId, WorkflowIdReusePolicy,
300};
301pub use update::UpdateRegistry;
302#[cfg(feature = "db")]
303pub use version_usage::{
304    VersionExecutionStateGroup, VersionUsageFilters, VersionUsageShardRow, load_version_usage,
305};
306
307#[cfg(feature = "db")]
308pub use store::EventHistory;
309
310#[cfg(feature = "db")]
311pub use models::{AuditRecord, NewAuditRecord};
312
313#[cfg(feature = "db")]
314pub use diesel;
315#[cfg(feature = "db")]
316pub use diesel_async;
317
318#[cfg(feature = "db")]
319pub use queue::{ConcurrencyKeyStats, QueueScalingSignal, QueueTaskCounts, queue_task_counts};
320
321// Allow macro-generated code to use ::autumn_harvest::serde_json
322pub use serde_json;
323// Allow macro-generated code to use ::autumn_harvest::serde
324pub use serde;
325// Allow macro-generated code to use ::autumn_harvest::chrono
326pub use chrono;
327// Allow macro-generated code to use ::autumn_harvest::uuid (e.g. update_with_start UUIDv5 derivation)
328pub use uuid;
329// Allow macro-generated code to use ::autumn_harvest::futures.
330#[doc(hidden)]
331pub use futures;
332
333/// Parse a human-readable byte-size string like `"2MiB"`, `"256KiB"`, `"4MB"`.
334///
335/// Accepted suffixes: `KiB`, `KB`, `MiB`, `MB`, `GiB`, `GB`, or no suffix for
336/// plain bytes. Returns `None` for empty strings or unrecognised suffixes.
337///
338/// ## Examples
339///
340/// ```rust
341/// assert_eq!(autumn_harvest::parse_byte_size("2MiB"), Some(2 * 1024 * 1024));
342/// assert_eq!(autumn_harvest::parse_byte_size("256KiB"), Some(256 * 1024));
343/// assert_eq!(autumn_harvest::parse_byte_size("4MB"), Some(4_000_000));
344/// assert_eq!(autumn_harvest::parse_byte_size("invalid"), None);
345/// ```
346#[must_use]
347#[allow(clippy::option_if_let_else)]
348pub fn parse_byte_size(s: &str) -> Option<u64> {
349    let s = s.trim();
350    if s.is_empty() {
351        return None;
352    }
353    let (digits, multiplier): (&str, u64) = if let Some(n) = s.strip_suffix("GiB") {
354        (n.trim(), 1024 * 1024 * 1024)
355    } else if let Some(n) = s.strip_suffix("GB") {
356        (n.trim(), 1_000_000_000)
357    } else if let Some(n) = s.strip_suffix("MiB") {
358        (n.trim(), 1024 * 1024)
359    } else if let Some(n) = s.strip_suffix("MB") {
360        (n.trim(), 1_000_000)
361    } else if let Some(n) = s.strip_suffix("KiB") {
362        (n.trim(), 1024)
363    } else if let Some(n) = s.strip_suffix("KB") {
364        (n.trim(), 1_000)
365    } else {
366        (s, 1)
367    };
368    let n: u64 = digits.parse().ok()?;
369    n.checked_mul(multiplier)
370}
371
372/// Parse a human-readable duration string like `"5m"`, `"30s"`, `"1h"`.
373///
374/// Used by macro-generated code — not intended for direct use.
375#[doc(hidden)]
376#[must_use]
377pub fn task_duration(s: &str) -> Option<std::time::Duration> {
378    let mut total_secs = 0u64;
379    let mut current_num = String::new();
380
381    for ch in s.chars() {
382        if ch.is_ascii_digit() {
383            if current_num == "0" {
384                current_num.clear();
385            }
386            if current_num.len() > 20 {
387                return None;
388            }
389            current_num.push(ch);
390        } else if ch.is_ascii_alphabetic() {
391            let num: u64 = current_num.parse().ok()?;
392            current_num.clear();
393            match ch {
394                's' => total_secs = total_secs.checked_add(num)?,
395                'm' => total_secs = total_secs.checked_add(num.checked_mul(60)?)?,
396                'h' => total_secs = total_secs.checked_add(num.checked_mul(3600)?)?,
397                'd' => total_secs = total_secs.checked_add(num.checked_mul(86400)?)?,
398                _ => return None,
399            }
400        } else if ch != ' ' {
401            return None;
402        }
403    }
404
405    if !current_num.is_empty() || total_secs == 0 {
406        return None;
407    }
408
409    Some(std::time::Duration::from_secs(total_secs))
410}
411
412#[cfg(test)]
413mod tests {
414    use super::task_duration;
415    use std::time::Duration;
416
417    #[cfg(feature = "db")]
418    #[test]
419    fn embedded_migrations_include_one_harvest_initial() {
420        use diesel::migration::MigrationSource;
421        use diesel::pg::Pg;
422
423        let migrations =
424            <diesel_migrations::EmbeddedMigrations as MigrationSource<Pg>>::migrations(
425                &super::MIGRATIONS,
426            )
427            .expect("embedded migrations should load");
428        let harvest_initial_count = migrations
429            .iter()
430            .filter(|migration| migration.name().to_string().ends_with("_harvest_initial"))
431            .count();
432
433        assert_eq!(harvest_initial_count, 1);
434    }
435
436    #[test]
437    fn task_duration_parses_compound_values() {
438        assert_eq!(task_duration("1h 30m"), Some(Duration::from_secs(5_400)));
439        assert_eq!(task_duration("5s"), Some(Duration::from_secs(5)));
440        assert_eq!(task_duration("2d"), Some(Duration::from_secs(172_800)));
441    }
442
443    #[test]
444    fn task_duration_rejects_invalid_values() {
445        assert_eq!(task_duration(""), None);
446        assert_eq!(task_duration("5"), None);
447        assert_eq!(task_duration("5x"), None);
448    }
449
450    #[test]
451    fn task_duration_rejects_overflow() {
452        assert_eq!(task_duration("18446744073709551615d"), None); // u64::MAX
453        assert_eq!(task_duration("18446744073709551615h"), None); // u64::MAX
454        assert_eq!(task_duration("18446744073709551615m"), None); // u64::MAX
455        assert_eq!(task_duration("18446744073709551614s 2s"), None); // Add overflow
456    }
457
458    #[test]
459    fn havoc_task_duration_oom_prevention() {
460        // Attack: Provide an excessively long string of digits.
461        // The implementation rejects it early instead of boundedly growing `current_num`.
462        let massive_input = "1".repeat(100);
463        let result = task_duration(&massive_input);
464        assert_eq!(result, None);
465    }
466
467    #[test]
468    fn task_duration_allows_zero_padded_values() {
469        let zero_padded = format!("{}1s", "0".repeat(50));
470        assert_eq!(task_duration(&zero_padded), Some(Duration::from_secs(1)));
471        // 0s is rejected globally by task_duration, so we test None
472        assert_eq!(task_duration("000000000000s"), None);
473        assert_eq!(task_duration("0000010m"), Some(Duration::from_secs(600)));
474    }
475}