Skip to main content

temporalio_common/
worker.rs

1//! Contains types that are needed by both the client and the sdk when configuring / interacting
2//! with workers.
3
4use crate::protos::temporal::api::enums::v1::{TaskQueueType, VersioningBehavior};
5use std::{
6    fs::File,
7    io::{self, BufReader, Read},
8    sync::OnceLock,
9};
10pub use temporalio_common_wasm::worker::WorkerDeploymentVersion;
11
12/// Specifies which task types a worker will poll for.
13///
14/// Workers can be configured to handle any combination of workflows, activities, and nexus operations.
15#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
16pub struct WorkerTaskTypes {
17    /// Whether workflow tasks are enabled.
18    pub enable_workflows: bool,
19    /// Whether local activity tasks are enabled.
20    pub enable_local_activities: bool,
21    /// Whether remote activity tasks are enabled.
22    pub enable_remote_activities: bool,
23    /// Whether nexus tasks are enabled.
24    pub enable_nexus: bool,
25}
26
27impl WorkerTaskTypes {
28    /// Check if no task types are enabled
29    pub fn is_empty(&self) -> bool {
30        !self.enable_workflows
31            && !self.enable_local_activities
32            && !self.enable_remote_activities
33            && !self.enable_nexus
34    }
35
36    /// Create a config with all task types enabled
37    pub fn all() -> WorkerTaskTypes {
38        WorkerTaskTypes {
39            enable_workflows: true,
40            enable_local_activities: true,
41            enable_remote_activities: true,
42            enable_nexus: true,
43        }
44    }
45
46    /// Create a config with only workflow tasks enabled
47    pub fn workflow_only() -> WorkerTaskTypes {
48        WorkerTaskTypes {
49            enable_workflows: true,
50            enable_local_activities: false,
51            enable_remote_activities: false,
52            enable_nexus: false,
53        }
54    }
55
56    /// Create a config with only activity tasks enabled
57    pub fn activity_only() -> WorkerTaskTypes {
58        WorkerTaskTypes {
59            enable_workflows: false,
60            enable_local_activities: false,
61            enable_remote_activities: true,
62            enable_nexus: false,
63        }
64    }
65
66    /// Create a config with only nexus tasks enabled
67    pub fn nexus_only() -> WorkerTaskTypes {
68        WorkerTaskTypes {
69            enable_workflows: false,
70            enable_local_activities: false,
71            enable_remote_activities: false,
72            enable_nexus: true,
73        }
74    }
75
76    /// Returns true if any task type is enabled in both configs.
77    pub fn overlaps_with(&self, other: &WorkerTaskTypes) -> bool {
78        (self.enable_workflows && other.enable_workflows)
79            || (self.enable_local_activities && other.enable_local_activities)
80            || (self.enable_remote_activities && other.enable_remote_activities)
81            || (self.enable_nexus && other.enable_nexus)
82    }
83
84    /// Converts the enabled task types into the corresponding [`TaskQueueType`] values.
85    pub fn to_task_queue_types(&self) -> Vec<TaskQueueType> {
86        let mut types = Vec::new();
87        if self.enable_workflows {
88            types.push(TaskQueueType::Workflow);
89        }
90        if self.enable_remote_activities {
91            types.push(TaskQueueType::Activity);
92        }
93        if self.enable_nexus {
94            types.push(TaskQueueType::Nexus);
95        }
96        types
97    }
98}
99
100/// Configuration for worker deployment versioning.
101#[derive(Clone, Debug, Eq, PartialEq, Hash)]
102pub struct WorkerDeploymentOptions {
103    /// The deployment version of this worker.
104    pub version: WorkerDeploymentVersion,
105    /// If set, opts in to the Worker Deployment Versioning feature, meaning this worker will only
106    /// receive tasks for workflows it claims to be compatible with.
107    pub use_worker_versioning: bool,
108    /// The default versioning behavior to use for workflows that do not pass one to Core.
109    /// It is a startup-time error to specify `Some(Unspecified)` here.
110    pub default_versioning_behavior: Option<VersioningBehavior>,
111}
112
113impl WorkerDeploymentOptions {
114    /// Create deployment options from just a build ID, without opting into worker versioning.
115    pub fn from_build_id(build_id: String) -> Self {
116        Self {
117            version: WorkerDeploymentVersion {
118                deployment_name: "".to_owned(),
119                build_id,
120            },
121            use_worker_versioning: false,
122            default_versioning_behavior: None,
123        }
124    }
125}
126
127static CACHED_BUILD_ID: OnceLock<String> = OnceLock::new();
128
129/// Build ID derived from hashing the on-disk bytes of the current executable.
130/// Deterministic across machines for the same binary. Cached per-process.
131pub fn build_id_from_current_exe() -> &'static str {
132    CACHED_BUILD_ID
133        .get_or_init(|| compute_crc32_exe_id().unwrap_or_else(|_| "undetermined".to_owned()))
134}
135
136fn compute_crc32_exe_id() -> io::Result<String> {
137    let exe_path = std::env::current_exe()?;
138    let file = File::open(exe_path)?;
139    let mut reader = BufReader::new(file);
140
141    let mut hasher = crc32fast::Hasher::new();
142    let mut buf = [0u8; 128 * 1024];
143
144    loop {
145        let n = reader.read(&mut buf)?;
146        if n == 0 {
147            break;
148        }
149        hasher.update(&buf[..n]);
150    }
151
152    let crc = hasher.finalize();
153
154    Ok(format!("{:08x}", crc))
155}