temporalio_common/
worker.rs1use crate::protos::{
5 coresdk, temporal,
6 temporal::api::enums::v1::{TaskQueueType, VersioningBehavior},
7};
8use std::{
9 fs::File,
10 io::{self, BufReader, Read},
11 str::FromStr,
12 sync::OnceLock,
13};
14
15#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
19pub struct WorkerTaskTypes {
20 pub enable_workflows: bool,
22 pub enable_local_activities: bool,
24 pub enable_remote_activities: bool,
26 pub enable_nexus: bool,
28}
29
30impl WorkerTaskTypes {
31 pub fn is_empty(&self) -> bool {
33 !self.enable_workflows
34 && !self.enable_local_activities
35 && !self.enable_remote_activities
36 && !self.enable_nexus
37 }
38
39 pub fn all() -> WorkerTaskTypes {
41 WorkerTaskTypes {
42 enable_workflows: true,
43 enable_local_activities: true,
44 enable_remote_activities: true,
45 enable_nexus: true,
46 }
47 }
48
49 pub fn workflow_only() -> WorkerTaskTypes {
51 WorkerTaskTypes {
52 enable_workflows: true,
53 enable_local_activities: false,
54 enable_remote_activities: false,
55 enable_nexus: false,
56 }
57 }
58
59 pub fn activity_only() -> WorkerTaskTypes {
61 WorkerTaskTypes {
62 enable_workflows: false,
63 enable_local_activities: false,
64 enable_remote_activities: true,
65 enable_nexus: false,
66 }
67 }
68
69 pub fn nexus_only() -> WorkerTaskTypes {
71 WorkerTaskTypes {
72 enable_workflows: false,
73 enable_local_activities: false,
74 enable_remote_activities: false,
75 enable_nexus: true,
76 }
77 }
78
79 pub fn overlaps_with(&self, other: &WorkerTaskTypes) -> bool {
81 (self.enable_workflows && other.enable_workflows)
82 || (self.enable_local_activities && other.enable_local_activities)
83 || (self.enable_remote_activities && other.enable_remote_activities)
84 || (self.enable_nexus && other.enable_nexus)
85 }
86
87 pub fn to_task_queue_types(&self) -> Vec<TaskQueueType> {
89 let mut types = Vec::new();
90 if self.enable_workflows {
91 types.push(TaskQueueType::Workflow);
92 }
93 if self.enable_remote_activities {
94 types.push(TaskQueueType::Activity);
95 }
96 if self.enable_nexus {
97 types.push(TaskQueueType::Nexus);
98 }
99 types
100 }
101}
102
103#[derive(Clone, Debug, Eq, PartialEq, Hash)]
105pub struct WorkerDeploymentOptions {
106 pub version: WorkerDeploymentVersion,
108 pub use_worker_versioning: bool,
111 pub default_versioning_behavior: Option<VersioningBehavior>,
114}
115
116impl WorkerDeploymentOptions {
117 pub fn from_build_id(build_id: String) -> Self {
119 Self {
120 version: WorkerDeploymentVersion {
121 deployment_name: "".to_owned(),
122 build_id,
123 },
124 use_worker_versioning: false,
125 default_versioning_behavior: None,
126 }
127 }
128}
129
130#[derive(Clone, Debug, Eq, PartialEq, Hash)]
132pub struct WorkerDeploymentVersion {
133 pub deployment_name: String,
135 pub build_id: String,
137}
138
139impl WorkerDeploymentVersion {
140 pub fn is_empty(&self) -> bool {
142 self.deployment_name.is_empty() && self.build_id.is_empty()
143 }
144}
145
146impl FromStr for WorkerDeploymentVersion {
147 type Err = ();
148
149 fn from_str(s: &str) -> Result<Self, Self::Err> {
150 match s.split_once('.') {
151 Some((name, build_id)) => Ok(WorkerDeploymentVersion {
152 deployment_name: name.to_owned(),
153 build_id: build_id.to_owned(),
154 }),
155 _ => Err(()),
156 }
157 }
158}
159
160impl From<WorkerDeploymentVersion> for coresdk::common::WorkerDeploymentVersion {
161 fn from(v: WorkerDeploymentVersion) -> coresdk::common::WorkerDeploymentVersion {
162 coresdk::common::WorkerDeploymentVersion {
163 deployment_name: v.deployment_name,
164 build_id: v.build_id,
165 }
166 }
167}
168
169impl From<coresdk::common::WorkerDeploymentVersion> for WorkerDeploymentVersion {
170 fn from(v: coresdk::common::WorkerDeploymentVersion) -> WorkerDeploymentVersion {
171 WorkerDeploymentVersion {
172 deployment_name: v.deployment_name,
173 build_id: v.build_id,
174 }
175 }
176}
177
178impl From<temporal::api::deployment::v1::WorkerDeploymentVersion> for WorkerDeploymentVersion {
179 fn from(v: temporal::api::deployment::v1::WorkerDeploymentVersion) -> Self {
180 Self {
181 deployment_name: v.deployment_name,
182 build_id: v.build_id,
183 }
184 }
185}
186
187static CACHED_BUILD_ID: OnceLock<String> = OnceLock::new();
188
189pub fn build_id_from_current_exe() -> &'static str {
192 CACHED_BUILD_ID
193 .get_or_init(|| compute_crc32_exe_id().unwrap_or_else(|_| "undetermined".to_owned()))
194}
195
196fn compute_crc32_exe_id() -> io::Result<String> {
197 let exe_path = std::env::current_exe()?;
198 let file = File::open(exe_path)?;
199 let mut reader = BufReader::new(file);
200
201 let mut hasher = crc32fast::Hasher::new();
202 let mut buf = [0u8; 128 * 1024];
203
204 loop {
205 let n = reader.read(&mut buf)?;
206 if n == 0 {
207 break;
208 }
209 hasher.update(&buf[..n]);
210 }
211
212 let crc = hasher.finalize();
213
214 Ok(format!("{:08x}", crc))
215}