1#[cfg(feature = "db")]
14#[macro_export]
15macro_rules! cfg_db {
16 ($($item:item)*) => {
17 $($item)*
18 };
19}
20
21#[cfg(not(feature = "db"))]
22#[macro_export]
23macro_rules! cfg_db {
24 ($($item:item)*) => {};
25}
26
27#[cfg(feature = "db")]
28pub const MIGRATIONS: diesel_migrations::EmbeddedMigrations =
29 diesel_migrations::embed_migrations!();
30
31#[cfg(feature = "db")]
36const _MIGRATIONS_LIST: &str = env!("HARVEST_MIGRATIONS_LIST");
37
38pub mod admission_gate;
40pub mod analyzer;
42#[cfg(feature = "db")]
44pub mod audit;
45pub mod batch;
47pub mod batch_start;
49#[cfg(feature = "db")]
51pub mod build_routing;
52pub mod builder;
53pub mod cache;
54pub mod calendar;
57pub mod circuit_breaker;
60pub mod completion_trigger;
61pub mod concurrency;
63pub mod context;
64pub mod critical_path;
65pub mod dag;
66pub mod dag_export;
68pub mod dag_linter;
69#[cfg(feature = "testing")]
70pub mod dag_profiler;
71#[cfg(any(test, feature = "testing"))]
72pub mod dag_simulator;
73pub mod det_check;
75pub mod diagnostic;
76pub mod eligibility;
77pub mod error;
78pub mod event;
79#[cfg(feature = "db")]
80#[doc(hidden)]
81pub mod execution;
82pub mod executor;
83#[cfg(feature = "db")]
84pub mod external_task;
85pub mod failure;
86pub mod guardrail;
88#[cfg(feature = "db")]
89pub mod handle;
90#[cfg(feature = "db")]
91pub mod handle_typed;
92pub mod history_export;
93pub mod info;
94#[cfg(feature = "metrics-rs")]
102pub mod metrics_rs_adapter;
103pub mod payload_codec;
104pub mod poison_pill;
105pub mod policy;
106pub mod pool;
107pub mod prelude;
108pub mod query;
110pub mod replay;
111#[cfg(feature = "db")]
112pub mod reset;
113pub mod retention;
114pub mod saga;
115pub mod shard;
116pub mod simulator;
117pub mod telemetry;
119#[cfg(any(test, feature = "testing"))]
120pub mod test_generator;
121#[cfg(any(test, feature = "testing"))]
123pub mod testing;
124pub mod types;
125pub mod update;
126#[cfg(feature = "db")]
127pub mod version_gate_retirement;
128#[cfg(feature = "db")]
129pub mod version_usage;
130
131#[cfg(feature = "db")]
132#[doc(hidden)]
133pub mod dlq;
134#[cfg(feature = "db")]
135#[doc(hidden)]
136pub mod heartbeat;
137#[cfg(feature = "db")]
138#[doc(hidden)]
139pub mod models;
140#[cfg(feature = "db")]
141#[doc(hidden)]
142pub mod notify;
143#[cfg(feature = "db")]
144#[doc(hidden)]
145pub mod queue;
146#[cfg(feature = "db")]
147pub mod schedule_decision;
148#[cfg(feature = "db")]
149pub mod scheduler;
150#[cfg(feature = "db")]
151#[allow(clippy::wildcard_imports)]
152#[doc(hidden)]
153pub mod schema;
154#[cfg(feature = "db")]
155#[doc(hidden)]
156pub mod signal;
157#[cfg(feature = "db")]
158#[doc(hidden)]
159pub mod store;
160#[cfg(feature = "db")]
161#[doc(hidden)]
162pub mod timeout;
163#[cfg(feature = "db")]
164#[doc(hidden)]
165pub mod worker;
166#[cfg(feature = "db")]
167pub mod workers;
168
169pub use admission_gate::{
170 AdmissionGate, AdmissionGateCache, AdmissionGateId, AdmissionGateView, GateScope,
171 MAX_ACTIVE_GATES, check_admission,
172};
173pub use analyzer::{
174 AnalyzerRule, AnalyzerWarning, ExcessiveRetriesRule, HistoryAnalyzer, LargePayloadRule,
175 SuspiciousTimerRule,
176};
177pub use builder::{
178 BuiltHarvest, HarvestBuilder, HarvestBuilderError, StickyRoutingConfig, WorkerConfig,
179};
180pub use cache::{CachedWorkflowState, WorkflowCache};
181#[cfg(feature = "db")]
182pub use calendar::{
183 BackfillSlot, create_calendar, delete_calendar, get_calendar, list_calendars,
184 load_exclusions_for_calendar, plan_backfill_with_calendar, preview_schedule_firings,
185 replace_calendar_exclusions,
186};
187pub use calendar::{
188 Calendar, ScheduleFirePreview, apply_skip_policy, calendar_excludes_weekends, is_excluded_date,
189};
190pub use completion_trigger::{CompletionTrigger, InputMapping, TerminalState};
191pub use context::{
192 ActivityContext, DEFAULT_HISTORY_CONTINUE_AS_NEW_THRESHOLD, WorkflowCommand, WorkflowContext,
193 WorkflowHistoryPolicy,
194};
195pub use critical_path::{CriticalPathAnalyzer, CriticalPathResult};
196pub use dag::{
197 DagBuildError, DagBuilder, DagCondition, DagDefinition, DagDispatchDecision, DagMapTaskRef,
198 DagTask, DagTaskRef,
199};
200pub use dag_export::{export_dot, export_mermaid};
201pub use dag_linter::{
202 DagLinter, DagRule, DagWarning, ExcessiveParallelismRule, MissingRetryPolicyRule,
203 MissingTimeoutRule,
204};
205#[cfg(feature = "testing")]
206pub use dag_profiler::{DagProfile, DagProfiler, ProfilerEvent, ProfilerEventKind};
207#[cfg(any(test, feature = "testing"))]
208pub use dag_simulator::{DagSimulator, DagSimulatorResult};
209pub use det_check::{
210 DetCheckReport, DetFinding, DetLocation, DetSeverity, DetSuppression, check_dir, check_file,
211 check_source,
212};
213pub use diagnostic::{DiagnosticReport, SimulatorResultExt};
214pub use error::{HarvestError, HarvestResult, TimeoutType};
215pub use event::{SideEffectKind, WorkflowEvent};
216#[cfg(feature = "db")]
217pub use execution::{
218 CancelledWorkflowExecution, PausedWorkflowExecution, ResumedWorkflowExecution,
219 SignalWithStartOutcome, SignalWithStartParams, StartWorkflowParams, StartedWorkflowExecution,
220 UpdateWithStartOutcome, UpdateWithStartParams, auto_resume_expired_pauses,
221 cancel_workflow_execution, pause_workflow_execution, resume_workflow_execution,
222 signal_with_start_workflow_execution, start_or_load_workflow_execution,
223 terminate_workflow_execution, update_with_start_workflow_execution,
224};
225pub use executor::{WorkflowOutcome, run_workflow};
226pub use guardrail::{
227 GuardrailFinding, GuardrailSuppression, GuardrailSuppressionError, RuleCategory, RuleEntry,
228 Severity, catalog as guardrail_catalog, rule_by_id as guardrail_rule_by_id,
229};
230#[cfg(feature = "db")]
231pub use handle::{
232 StartedWorkflowHandle, WorkflowHandle, WorkflowHandleClient, WorkflowResult,
233 WorkflowResultState, start_or_load_workflow_execution_with_handle,
234};
235#[cfg(feature = "db")]
236pub use handle_typed::{
237 TypedSignalWithStartOptions, TypedStartOptions, TypedUpdateWithStartOptions,
238 TypedWorkflowHandle, TypedWorkflowResult,
239};
240pub use history_export::{
241 DEFAULT_HISTORY_EXPORT_MAX_BYTES, HISTORY_EXPORT_SCHEMA, HISTORY_EXPORT_VERSION,
242 HistoryExportDocument, HistoryExportError, HistoryExportRequest, HistoryExportSizeLimit,
243 HistoryExportStatus, HistoryPayloadPolicy, export_history, export_mermaid_sequence,
244};
245pub use info::{
246 ActivityHandlerFn, ActivityInfo, DagInfo, QueryHandlerFn, QueryHandlerInfo, UpdateHandlerFn,
247 UpdateHandlerInfo, UpdateValidatorFn, WorkflowHandlerFn, WorkflowInfo,
248};
249pub use payload_codec::{CodecError, IdentityCodec, PayloadCodec, PayloadCodecs};
250pub use policy::validate_schedule;
251pub use policy::{
252 CatchupPolicy, MapFailurePolicy, OverlapPolicy, RetryPolicy, Schedule, SkipPolicy, TaskStatus,
253 TriggerRule, WorkflowSchedule,
254};
255pub use pool::{HarvestPoolConfig, compute_pool_sizes};
256pub use query::QueryRegistry;
257pub use replay::{HistoryMatch, HistoryMatcher, SignalOrTimerMatch};
258#[cfg(feature = "db")]
259pub use reset::{
260 ResetInvalidPoint, ResetPlan, ResetResult, ResetSignalReapplyPolicy, ResetUnresolvedSideEffect,
261 WorkflowResetError, WorkflowResetRequest, preview_workflow_reset, reset_workflow_execution,
262 validate_reset_point,
263};
264pub use retention::{ArchiverFuture, HistoryArchiver, RetentionConfig};
265#[cfg(feature = "db")]
266pub use retention::{RetentionMonitor, RetentionRuntime, RetentionStatus, RetentionTickResult};
267pub use saga::Saga;
268#[cfg(feature = "db")]
269pub use schedule_decision::record_decision_graceful;
270#[cfg(feature = "db")]
271pub use scheduler::{
272 DagCatalog, RegisteredDag, SchedulerMonitor, SchedulerRuntime, compile_dag_catalog,
273 register_schedules, register_workflow_schedules, tick_once, trigger_unified_dag,
274};
275pub use shard::ShardRouter;
276#[cfg(feature = "db")]
277pub use shard::ShardedDbPool;
278pub use simulator::{SimulatorResult, WorkflowSimulator};
279#[cfg(feature = "db")]
280pub use store::AwaitMode;
281pub use telemetry::{
282 ActivityStatus, MetricsRecorder, NoOpMetrics, NoOpPropagator, TelemetryConfig,
283 TelemetryConfigBuilder, TraceContextCarrier, TraceContextPropagator, WorkflowStatus,
284};
285#[cfg(any(test, feature = "testing"))]
286pub use test_generator::TestHarnessGenerator;
287#[cfg(feature = "testing")]
288pub use testing::{
289 BatchReplayReport, CiReport, FailOnMode, FixtureResult, FixtureStatus, HarnessErrorKind,
290 ReplayVerifier, ReportFormat, TestRunOutcome, WorkflowTestEnv,
291};
292#[cfg(any(test, feature = "testing"))]
293pub use testing::{
294 HistorySnapshot, NonDeterminismKind, ReplayReport, ReplayStatus, WorkflowReplayer,
295};
296pub use types::{
297 ActivityExecId, BuildId, DeploymentName, ExecutionId, ExternalActivityToken, ExternalCancelId,
298 ExternalSignalId, ParentClosePolicy, Priority, ShardId, TimerId, UpdateId, WorkerId,
299 WorkflowId, WorkflowIdReusePolicy,
300};
301pub use update::UpdateRegistry;
302#[cfg(feature = "db")]
303pub use version_usage::{
304 VersionExecutionStateGroup, VersionUsageFilters, VersionUsageShardRow, load_version_usage,
305};
306
307#[cfg(feature = "db")]
308pub use store::EventHistory;
309
310#[cfg(feature = "db")]
311pub use models::{AuditRecord, NewAuditRecord};
312
313#[cfg(feature = "db")]
314pub use diesel;
315#[cfg(feature = "db")]
316pub use diesel_async;
317
318#[cfg(feature = "db")]
319pub use queue::{ConcurrencyKeyStats, QueueScalingSignal, QueueTaskCounts, queue_task_counts};
320
321pub use serde_json;
323pub use serde;
325pub use chrono;
327pub use uuid;
329#[doc(hidden)]
331pub use futures;
332
333#[must_use]
347#[allow(clippy::option_if_let_else)]
348pub fn parse_byte_size(s: &str) -> Option<u64> {
349 let s = s.trim();
350 if s.is_empty() {
351 return None;
352 }
353 let (digits, multiplier): (&str, u64) = if let Some(n) = s.strip_suffix("GiB") {
354 (n.trim(), 1024 * 1024 * 1024)
355 } else if let Some(n) = s.strip_suffix("GB") {
356 (n.trim(), 1_000_000_000)
357 } else if let Some(n) = s.strip_suffix("MiB") {
358 (n.trim(), 1024 * 1024)
359 } else if let Some(n) = s.strip_suffix("MB") {
360 (n.trim(), 1_000_000)
361 } else if let Some(n) = s.strip_suffix("KiB") {
362 (n.trim(), 1024)
363 } else if let Some(n) = s.strip_suffix("KB") {
364 (n.trim(), 1_000)
365 } else {
366 (s, 1)
367 };
368 let n: u64 = digits.parse().ok()?;
369 n.checked_mul(multiplier)
370}
371
372#[doc(hidden)]
376#[must_use]
377pub fn task_duration(s: &str) -> Option<std::time::Duration> {
378 let mut total_secs = 0u64;
379 let mut current_num = String::new();
380
381 for ch in s.chars() {
382 if ch.is_ascii_digit() {
383 if current_num == "0" {
384 current_num.clear();
385 }
386 if current_num.len() > 20 {
387 return None;
388 }
389 current_num.push(ch);
390 } else if ch.is_ascii_alphabetic() {
391 let num: u64 = current_num.parse().ok()?;
392 current_num.clear();
393 match ch {
394 's' => total_secs = total_secs.checked_add(num)?,
395 'm' => total_secs = total_secs.checked_add(num.checked_mul(60)?)?,
396 'h' => total_secs = total_secs.checked_add(num.checked_mul(3600)?)?,
397 'd' => total_secs = total_secs.checked_add(num.checked_mul(86400)?)?,
398 _ => return None,
399 }
400 } else if ch != ' ' {
401 return None;
402 }
403 }
404
405 if !current_num.is_empty() || total_secs == 0 {
406 return None;
407 }
408
409 Some(std::time::Duration::from_secs(total_secs))
410}
411
412#[cfg(test)]
413mod tests {
414 use super::task_duration;
415 use std::time::Duration;
416
417 #[cfg(feature = "db")]
418 #[test]
419 fn embedded_migrations_include_one_harvest_initial() {
420 use diesel::migration::MigrationSource;
421 use diesel::pg::Pg;
422
423 let migrations =
424 <diesel_migrations::EmbeddedMigrations as MigrationSource<Pg>>::migrations(
425 &super::MIGRATIONS,
426 )
427 .expect("embedded migrations should load");
428 let harvest_initial_count = migrations
429 .iter()
430 .filter(|migration| migration.name().to_string().ends_with("_harvest_initial"))
431 .count();
432
433 assert_eq!(harvest_initial_count, 1);
434 }
435
436 #[test]
437 fn task_duration_parses_compound_values() {
438 assert_eq!(task_duration("1h 30m"), Some(Duration::from_secs(5_400)));
439 assert_eq!(task_duration("5s"), Some(Duration::from_secs(5)));
440 assert_eq!(task_duration("2d"), Some(Duration::from_secs(172_800)));
441 }
442
443 #[test]
444 fn task_duration_rejects_invalid_values() {
445 assert_eq!(task_duration(""), None);
446 assert_eq!(task_duration("5"), None);
447 assert_eq!(task_duration("5x"), None);
448 }
449
450 #[test]
451 fn task_duration_rejects_overflow() {
452 assert_eq!(task_duration("18446744073709551615d"), None); assert_eq!(task_duration("18446744073709551615h"), None); assert_eq!(task_duration("18446744073709551615m"), None); assert_eq!(task_duration("18446744073709551614s 2s"), None); }
457
458 #[test]
459 fn havoc_task_duration_oom_prevention() {
460 let massive_input = "1".repeat(100);
463 let result = task_duration(&massive_input);
464 assert_eq!(result, None);
465 }
466
467 #[test]
468 fn task_duration_allows_zero_padded_values() {
469 let zero_padded = format!("{}1s", "0".repeat(50));
470 assert_eq!(task_duration(&zero_padded), Some(Duration::from_secs(1)));
471 assert_eq!(task_duration("000000000000s"), None);
473 assert_eq!(task_duration("0000010m"), Some(Duration::from_secs(600)));
474 }
475}