Skip to main content

temporalio_common/
worker.rs

1//! Contains types that are needed by both the client and the sdk when configuring / interacting
2//! with workers.
3
4use crate::protos::{
5    coresdk, temporal,
6    temporal::api::enums::v1::{TaskQueueType, VersioningBehavior},
7};
8use std::{
9    fs::File,
10    io::{self, BufReader, Read},
11    str::FromStr,
12    sync::OnceLock,
13};
14
15/// Specifies which task types a worker will poll for.
16///
17/// Workers can be configured to handle any combination of workflows, activities, and nexus operations.
18#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
19pub struct WorkerTaskTypes {
20    /// Whether workflow tasks are enabled.
21    pub enable_workflows: bool,
22    /// Whether local activity tasks are enabled.
23    pub enable_local_activities: bool,
24    /// Whether remote activity tasks are enabled.
25    pub enable_remote_activities: bool,
26    /// Whether nexus tasks are enabled.
27    pub enable_nexus: bool,
28}
29
30impl WorkerTaskTypes {
31    /// Check if no task types are enabled
32    pub fn is_empty(&self) -> bool {
33        !self.enable_workflows
34            && !self.enable_local_activities
35            && !self.enable_remote_activities
36            && !self.enable_nexus
37    }
38
39    /// Create a config with all task types enabled
40    pub fn all() -> WorkerTaskTypes {
41        WorkerTaskTypes {
42            enable_workflows: true,
43            enable_local_activities: true,
44            enable_remote_activities: true,
45            enable_nexus: true,
46        }
47    }
48
49    /// Create a config with only workflow tasks enabled
50    pub fn workflow_only() -> WorkerTaskTypes {
51        WorkerTaskTypes {
52            enable_workflows: true,
53            enable_local_activities: false,
54            enable_remote_activities: false,
55            enable_nexus: false,
56        }
57    }
58
59    /// Create a config with only activity tasks enabled
60    pub fn activity_only() -> WorkerTaskTypes {
61        WorkerTaskTypes {
62            enable_workflows: false,
63            enable_local_activities: false,
64            enable_remote_activities: true,
65            enable_nexus: false,
66        }
67    }
68
69    /// Create a config with only nexus tasks enabled
70    pub fn nexus_only() -> WorkerTaskTypes {
71        WorkerTaskTypes {
72            enable_workflows: false,
73            enable_local_activities: false,
74            enable_remote_activities: false,
75            enable_nexus: true,
76        }
77    }
78
79    /// Returns true if any task type is enabled in both configs.
80    pub fn overlaps_with(&self, other: &WorkerTaskTypes) -> bool {
81        (self.enable_workflows && other.enable_workflows)
82            || (self.enable_local_activities && other.enable_local_activities)
83            || (self.enable_remote_activities && other.enable_remote_activities)
84            || (self.enable_nexus && other.enable_nexus)
85    }
86
87    /// Converts the enabled task types into the corresponding [`TaskQueueType`] values.
88    pub fn to_task_queue_types(&self) -> Vec<TaskQueueType> {
89        let mut types = Vec::new();
90        if self.enable_workflows {
91            types.push(TaskQueueType::Workflow);
92        }
93        if self.enable_remote_activities {
94            types.push(TaskQueueType::Activity);
95        }
96        if self.enable_nexus {
97            types.push(TaskQueueType::Nexus);
98        }
99        types
100    }
101}
102
103/// Configuration for worker deployment versioning.
104#[derive(Clone, Debug, Eq, PartialEq, Hash)]
105pub struct WorkerDeploymentOptions {
106    /// The deployment version of this worker.
107    pub version: WorkerDeploymentVersion,
108    /// If set, opts in to the Worker Deployment Versioning feature, meaning this worker will only
109    /// receive tasks for workflows it claims to be compatible with.
110    pub use_worker_versioning: bool,
111    /// The default versioning behavior to use for workflows that do not pass one to Core.
112    /// It is a startup-time error to specify `Some(Unspecified)` here.
113    pub default_versioning_behavior: Option<VersioningBehavior>,
114}
115
116impl WorkerDeploymentOptions {
117    /// Create deployment options from just a build ID, without opting into worker versioning.
118    pub fn from_build_id(build_id: String) -> Self {
119        Self {
120            version: WorkerDeploymentVersion {
121                deployment_name: "".to_owned(),
122                build_id,
123            },
124            use_worker_versioning: false,
125            default_versioning_behavior: None,
126        }
127    }
128}
129
130/// Identifies a specific version of a worker deployment.
131#[derive(Clone, Debug, Eq, PartialEq, Hash)]
132pub struct WorkerDeploymentVersion {
133    /// Name of the deployment
134    pub deployment_name: String,
135    /// Build ID for the worker.
136    pub build_id: String,
137}
138
139impl WorkerDeploymentVersion {
140    /// Returns true if both the deployment name and build ID are empty.
141    pub fn is_empty(&self) -> bool {
142        self.deployment_name.is_empty() && self.build_id.is_empty()
143    }
144}
145
146impl FromStr for WorkerDeploymentVersion {
147    type Err = ();
148
149    fn from_str(s: &str) -> Result<Self, Self::Err> {
150        match s.split_once('.') {
151            Some((name, build_id)) => Ok(WorkerDeploymentVersion {
152                deployment_name: name.to_owned(),
153                build_id: build_id.to_owned(),
154            }),
155            _ => Err(()),
156        }
157    }
158}
159
160impl From<WorkerDeploymentVersion> for coresdk::common::WorkerDeploymentVersion {
161    fn from(v: WorkerDeploymentVersion) -> coresdk::common::WorkerDeploymentVersion {
162        coresdk::common::WorkerDeploymentVersion {
163            deployment_name: v.deployment_name,
164            build_id: v.build_id,
165        }
166    }
167}
168
169impl From<coresdk::common::WorkerDeploymentVersion> for WorkerDeploymentVersion {
170    fn from(v: coresdk::common::WorkerDeploymentVersion) -> WorkerDeploymentVersion {
171        WorkerDeploymentVersion {
172            deployment_name: v.deployment_name,
173            build_id: v.build_id,
174        }
175    }
176}
177
178impl From<temporal::api::deployment::v1::WorkerDeploymentVersion> for WorkerDeploymentVersion {
179    fn from(v: temporal::api::deployment::v1::WorkerDeploymentVersion) -> Self {
180        Self {
181            deployment_name: v.deployment_name,
182            build_id: v.build_id,
183        }
184    }
185}
186
187static CACHED_BUILD_ID: OnceLock<String> = OnceLock::new();
188
189/// Build ID derived from hashing the on-disk bytes of the current executable.
190/// Deterministic across machines for the same binary. Cached per-process.
191pub fn build_id_from_current_exe() -> &'static str {
192    CACHED_BUILD_ID
193        .get_or_init(|| compute_crc32_exe_id().unwrap_or_else(|_| "undetermined".to_owned()))
194}
195
196fn compute_crc32_exe_id() -> io::Result<String> {
197    let exe_path = std::env::current_exe()?;
198    let file = File::open(exe_path)?;
199    let mut reader = BufReader::new(file);
200
201    let mut hasher = crc32fast::Hasher::new();
202    let mut buf = [0u8; 128 * 1024];
203
204    loop {
205        let n = reader.read(&mut buf)?;
206        if n == 0 {
207            break;
208        }
209        hasher.update(&buf[..n]);
210    }
211
212    let crc = hasher.finalize();
213
214    Ok(format!("{:08x}", crc))
215}