1use crate::model::oplog::{IndexedResourceKey, TimestampedUpdateDescription, WorkerResourceId};
16use crate::model::regions::DeletedRegions;
17use bincode::de::{BorrowDecoder, Decoder};
18use bincode::enc::Encoder;
19use bincode::error::{DecodeError, EncodeError};
20use bincode::{BorrowDecode, Decode, Encode};
21
22pub use crate::base_model::*;
23use crate::model::invocation_context::InvocationContextStack;
24use golem_wasm_ast::analysis::analysed_type::{field, list, record, str, tuple, u32, u64};
25use golem_wasm_ast::analysis::AnalysedType;
26use golem_wasm_rpc::{IntoValue, Value};
27use golem_wasm_rpc_derive::IntoValue;
28use http::Uri;
29use rand::prelude::IteratorRandom;
30use serde::de::Unexpected;
31use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
32use std::cmp::Ordering;
33use std::collections::{HashMap, HashSet, VecDeque};
34use std::fmt::{Display, Formatter};
35use std::ops::Add;
36use std::str::FromStr;
37use std::time::{Duration, SystemTime};
38use typed_path::Utf8UnixPathBuf;
39use uuid::{uuid, Uuid};
40
41pub mod auth;
42pub mod base64;
43pub mod component;
44pub mod component_constraint;
45pub mod component_metadata;
46pub mod config;
47pub mod error;
48pub mod exports;
49pub mod invocation_context;
50pub mod lucene;
51pub mod oplog;
52pub mod plugin;
53pub mod project;
54pub mod public_oplog;
55pub mod regions;
56pub mod trim_date;
57
58#[cfg(feature = "poem")]
59mod poem;
60
61#[cfg(feature = "protobuf")]
62pub mod protobuf;
63
64#[cfg(feature = "poem")]
65pub trait PoemTypeRequirements:
66 poem_openapi::types::Type + poem_openapi::types::ParseFromJSON + poem_openapi::types::ToJSON
67{
68}
69
70#[cfg(not(feature = "poem"))]
71pub trait PoemTypeRequirements {}
72
73#[cfg(feature = "poem")]
74impl<
75 T: poem_openapi::types::Type
76 + poem_openapi::types::ParseFromJSON
77 + poem_openapi::types::ToJSON,
78 > PoemTypeRequirements for T
79{
80}
81
82#[cfg(not(feature = "poem"))]
83impl<T> PoemTypeRequirements for T {}
84
85#[cfg(feature = "poem")]
86pub trait PoemMultipartTypeRequirements: poem_openapi::types::ParseFromMultipartField {}
87
88#[cfg(not(feature = "poem"))]
89pub trait PoemMultipartTypeRequirements {}
90
91#[cfg(feature = "poem")]
92impl<T: poem_openapi::types::ParseFromMultipartField> PoemMultipartTypeRequirements for T {}
93
94#[cfg(not(feature = "poem"))]
95impl<T> PoemMultipartTypeRequirements for T {}
96
97#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
98#[repr(transparent)]
99pub struct Timestamp(iso8601_timestamp::Timestamp);
100
101impl Timestamp {
102 pub fn now_utc() -> Timestamp {
103 Timestamp(iso8601_timestamp::Timestamp::now_utc())
104 }
105
106 pub fn to_millis(&self) -> u64 {
107 self.0
108 .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH)
109 .whole_milliseconds() as u64
110 }
111}
112
113impl Display for Timestamp {
114 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115 write!(f, "{}", self.0)
116 }
117}
118
119impl FromStr for Timestamp {
120 type Err = String;
121
122 fn from_str(s: &str) -> Result<Self, Self::Err> {
123 match iso8601_timestamp::Timestamp::parse(s) {
124 Some(ts) => Ok(Self(ts)),
125 None => Err("Invalid timestamp".to_string()),
126 }
127 }
128}
129
130impl serde::Serialize for Timestamp {
131 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
132 where
133 S: Serializer,
134 {
135 self.0.serialize(serializer)
136 }
137}
138
139impl<'de> serde::Deserialize<'de> for Timestamp {
140 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
141 where
142 D: Deserializer<'de>,
143 {
144 if deserializer.is_human_readable() {
145 iso8601_timestamp::Timestamp::deserialize(deserializer).map(Self)
146 } else {
147 let timestamp = i64::deserialize(deserializer)?;
149 Ok(Timestamp(
150 iso8601_timestamp::Timestamp::UNIX_EPOCH
151 .add(Duration::from_millis(timestamp as u64)),
152 ))
153 }
154 }
155}
156
157impl bincode::Encode for Timestamp {
158 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
159 (self
160 .0
161 .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH)
162 .whole_milliseconds() as i64)
163 .encode(encoder)
164 }
165}
166
167impl<Context> bincode::Decode<Context> for Timestamp {
168 fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
169 let timestamp: i64 = bincode::Decode::decode(decoder)?;
170 Ok(Timestamp(
171 iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(timestamp as u64)),
172 ))
173 }
174}
175
176impl<'de, Context> bincode::BorrowDecode<'de, Context> for Timestamp {
177 fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
178 decoder: &mut D,
179 ) -> Result<Self, DecodeError> {
180 let timestamp: i64 = bincode::BorrowDecode::borrow_decode(decoder)?;
181 Ok(Timestamp(
182 iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(timestamp as u64)),
183 ))
184 }
185}
186
187impl From<u64> for Timestamp {
188 fn from(value: u64) -> Self {
189 Timestamp(iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(value)))
190 }
191}
192
193impl IntoValue for Timestamp {
194 fn into_value(self) -> Value {
195 let d = self
196 .0
197 .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH);
198 Value::Record(vec![
199 Value::U64(d.whole_seconds() as u64),
200 Value::U32(d.subsec_nanoseconds() as u32),
201 ])
202 }
203
204 fn get_type() -> AnalysedType {
205 record(vec![field("seconds", u64()), field("nanoseconds", u32())])
206 }
207}
208
209#[derive(Clone, Debug, Eq, PartialEq, Hash, Encode, Decode)]
211pub struct OwnedWorkerId {
212 pub project_id: ProjectId,
213 pub worker_id: WorkerId,
214}
215
216impl OwnedWorkerId {
217 pub fn new(project_id: &ProjectId, worker_id: &WorkerId) -> Self {
218 Self {
219 project_id: project_id.clone(),
220 worker_id: worker_id.clone(),
221 }
222 }
223
224 pub fn worker_id(&self) -> WorkerId {
225 self.worker_id.clone()
226 }
227
228 pub fn project_id(&self) -> ProjectId {
229 self.project_id.clone()
230 }
231
232 pub fn component_id(&self) -> ComponentId {
233 self.worker_id.component_id.clone()
234 }
235
236 pub fn worker_name(&self) -> String {
237 self.worker_id.worker_name.clone()
238 }
239}
240
241impl Display for OwnedWorkerId {
242 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243 write!(f, "{}/{}", self.project_id, self.worker_id)
244 }
245}
246
247impl AsRef<WorkerId> for OwnedWorkerId {
248 fn as_ref(&self) -> &WorkerId {
249 &self.worker_id
250 }
251}
252
253#[derive(Debug, Clone, PartialEq, Encode, Decode)]
255pub enum ScheduledAction {
256 CompletePromise {
258 account_id: AccountId,
259 project_id: ProjectId,
260 promise_id: PromiseId,
261 },
262 ArchiveOplog {
266 account_id: AccountId,
267 owned_worker_id: OwnedWorkerId,
268 last_oplog_index: OplogIndex,
269 next_after: Duration,
270 },
271 Invoke {
274 account_id: AccountId,
275 owned_worker_id: OwnedWorkerId,
276 idempotency_key: IdempotencyKey,
277 full_function_name: String,
278 function_input: Vec<Value>,
279 invocation_context: InvocationContextStack,
280 },
281}
282
283impl ScheduledAction {
284 pub fn owned_worker_id(&self) -> OwnedWorkerId {
285 match self {
286 ScheduledAction::CompletePromise {
287 project_id,
288 promise_id,
289 ..
290 } => OwnedWorkerId::new(project_id, &promise_id.worker_id),
291 ScheduledAction::ArchiveOplog {
292 owned_worker_id, ..
293 } => owned_worker_id.clone(),
294 ScheduledAction::Invoke {
295 owned_worker_id, ..
296 } => owned_worker_id.clone(),
297 }
298 }
299}
300
301impl Display for ScheduledAction {
302 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
303 match self {
304 ScheduledAction::CompletePromise { promise_id, .. } => {
305 write!(f, "complete[{promise_id}]")
306 }
307 ScheduledAction::ArchiveOplog {
308 owned_worker_id, ..
309 } => {
310 write!(f, "archive[{owned_worker_id}]")
311 }
312 ScheduledAction::Invoke {
313 owned_worker_id, ..
314 } => write!(f, "invoke[{owned_worker_id}]"),
315 }
316 }
317}
318
319#[derive(Debug, Clone, Encode, Decode)]
320pub struct ScheduleId {
321 pub timestamp: i64,
322 pub action: ScheduledAction,
323}
324
325impl Display for ScheduleId {
326 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
327 write!(f, "{}@{}", self.action, self.timestamp)
328 }
329}
330
331#[derive(Clone)]
332pub struct NumberOfShards {
333 pub value: usize,
334}
335
336#[derive(Clone, Debug, PartialEq, Eq, Hash)]
337pub struct Pod {
338 host: String,
339 port: u16,
340}
341
342impl Pod {
343 pub fn uri(&self) -> Uri {
344 Uri::builder()
345 .scheme("http")
346 .authority(format!("{}:{}", self.host, self.port).as_str())
347 .path_and_query("/")
348 .build()
349 .expect("Failed to build URI")
350 }
351}
352
353#[derive(Clone)]
354pub struct RoutingTable {
355 pub number_of_shards: NumberOfShards,
356 shard_assignments: HashMap<ShardId, Pod>,
357}
358
359impl RoutingTable {
360 pub fn lookup(&self, worker_id: &WorkerId) -> Option<&Pod> {
361 self.shard_assignments.get(&ShardId::from_worker_id(
362 &worker_id.clone(),
363 self.number_of_shards.value,
364 ))
365 }
366
367 pub fn random(&self) -> Option<&Pod> {
368 self.shard_assignments.values().choose(&mut rand::rng())
369 }
370
371 pub fn first(&self) -> Option<&Pod> {
372 self.shard_assignments.values().next()
373 }
374
375 pub fn all(&self) -> HashSet<&Pod> {
376 self.shard_assignments.values().collect()
377 }
378}
379
380#[allow(dead_code)]
381pub struct RoutingTableEntry {
382 shard_id: ShardId,
383 pod: Pod,
384}
385
386#[derive(Clone, Debug, Default)]
387pub struct ShardAssignment {
388 pub number_of_shards: usize,
389 pub shard_ids: HashSet<ShardId>,
390}
391
392impl ShardAssignment {
393 pub fn new(number_of_shards: usize, shard_ids: HashSet<ShardId>) -> Self {
394 Self {
395 number_of_shards,
396 shard_ids,
397 }
398 }
399
400 pub fn assign_shards(&mut self, shard_ids: &HashSet<ShardId>) {
401 for shard_id in shard_ids {
402 self.shard_ids.insert(*shard_id);
403 }
404 }
405
406 pub fn register(&mut self, number_of_shards: usize, shard_ids: &HashSet<ShardId>) {
407 self.number_of_shards = number_of_shards;
408 for shard_id in shard_ids {
409 self.shard_ids.insert(*shard_id);
410 }
411 }
412
413 pub fn revoke_shards(&mut self, shard_ids: &HashSet<ShardId>) {
414 for shard_id in shard_ids {
415 self.shard_ids.remove(shard_id);
416 }
417 }
418}
419
420impl Display for ShardAssignment {
421 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
422 let shard_ids = self
423 .shard_ids
424 .iter()
425 .map(|shard_id| shard_id.to_string())
426 .collect::<Vec<_>>()
427 .join(",");
428 write!(
429 f,
430 "{{ number_of_shards: {}, shard_ids: {} }}",
431 self.number_of_shards, shard_ids
432 )
433 }
434}
435
436#[derive(Clone, Debug, Encode, Decode, Eq, Hash, PartialEq, IntoValue)]
437#[wit_transparent]
438pub struct IdempotencyKey {
439 pub value: String,
440}
441
442impl IdempotencyKey {
443 const ROOT_NS: Uuid = uuid!("9C19B15A-C83D-46F7-9BC3-EAD7923733F4");
444
445 pub fn new(value: String) -> Self {
446 Self { value }
447 }
448
449 pub fn from_uuid(value: Uuid) -> Self {
450 Self {
451 value: value.to_string(),
452 }
453 }
454
455 pub fn fresh() -> Self {
456 Self::from_uuid(Uuid::new_v4())
457 }
458
459 pub fn derived(base: &IdempotencyKey, oplog_index: OplogIndex) -> Self {
469 let namespace = if let Ok(base_uuid) = Uuid::parse_str(&base.value) {
470 base_uuid
471 } else {
472 Uuid::new_v5(&Self::ROOT_NS, base.value.as_bytes())
473 };
474 let name = format!("oplog-index-{oplog_index}");
475 Self::from_uuid(Uuid::new_v5(&namespace, name.as_bytes()))
476 }
477}
478
479impl Serialize for IdempotencyKey {
480 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
481 where
482 S: Serializer,
483 {
484 self.value.serialize(serializer)
485 }
486}
487
488impl<'de> Deserialize<'de> for IdempotencyKey {
489 fn deserialize<D>(deserializer: D) -> Result<IdempotencyKey, D::Error>
490 where
491 D: Deserializer<'de>,
492 {
493 let value = String::deserialize(deserializer)?;
494 Ok(IdempotencyKey { value })
495 }
496}
497
498impl Display for IdempotencyKey {
499 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
500 write!(f, "{}", self.value)
501 }
502}
503
504#[derive(Clone, Debug)]
505pub struct WorkerMetadata {
506 pub worker_id: WorkerId,
507 pub args: Vec<String>,
508 pub env: Vec<(String, String)>,
509 pub project_id: ProjectId,
510 pub created_by: AccountId,
511 pub created_at: Timestamp,
512 pub parent: Option<WorkerId>,
513 pub last_known_status: WorkerStatusRecord,
514}
515
516impl WorkerMetadata {
517 pub fn default(
518 worker_id: WorkerId,
519 created_by: AccountId,
520 project_id: ProjectId,
521 ) -> WorkerMetadata {
522 WorkerMetadata {
523 worker_id,
524 args: vec![],
525 env: vec![],
526 project_id,
527 created_by,
528 created_at: Timestamp::now_utc(),
529 parent: None,
530 last_known_status: WorkerStatusRecord::default(),
531 }
532 }
533
534 pub fn owned_worker_id(&self) -> OwnedWorkerId {
535 OwnedWorkerId::new(&self.project_id, &self.worker_id)
536 }
537}
538
539impl IntoValue for WorkerMetadata {
540 fn into_value(self) -> Value {
541 Value::Record(vec![
542 self.worker_id.into_value(),
543 self.args.into_value(),
544 self.env.into_value(),
545 self.last_known_status.status.into_value(),
546 self.last_known_status.component_version.into_value(),
547 0u64.into_value(), ])
549 }
550
551 fn get_type() -> AnalysedType {
552 record(vec![
553 field("worker-id", WorkerId::get_type()),
554 field("args", list(str())),
555 field("env", list(tuple(vec![str(), str()]))),
556 field("status", WorkerStatus::get_type()),
557 field("component-version", u64()),
558 field("retry-count", u64()),
559 ])
560 }
561}
562
563#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
564pub struct WorkerResourceDescription {
565 pub created_at: Timestamp,
566 pub indexed_resource_key: Option<IndexedResourceKey>,
567}
568
569#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)]
570pub struct RetryConfig {
571 pub max_attempts: u32,
572 #[serde(with = "humantime_serde")]
573 pub min_delay: Duration,
574 #[serde(with = "humantime_serde")]
575 pub max_delay: Duration,
576 pub multiplier: f64,
577 pub max_jitter_factor: Option<f64>,
578}
579
580#[derive(Clone, Debug, PartialEq, Encode)]
586pub struct WorkerStatusRecord {
587 pub status: WorkerStatus,
588 pub skipped_regions: DeletedRegions,
589 pub overridden_retry_config: Option<RetryConfig>,
590 pub pending_invocations: Vec<TimestampedWorkerInvocation>,
591 pub pending_updates: VecDeque<TimestampedUpdateDescription>,
592 pub failed_updates: Vec<FailedUpdateRecord>,
593 pub successful_updates: Vec<SuccessfulUpdateRecord>,
594 pub invocation_results: HashMap<IdempotencyKey, OplogIndex>,
595 pub current_idempotency_key: Option<IdempotencyKey>,
596 pub component_version: ComponentVersion,
597 pub component_size: u64,
598 pub total_linear_memory_size: u64,
599 pub owned_resources: HashMap<WorkerResourceId, WorkerResourceDescription>,
600 pub oplog_idx: OplogIndex,
601 pub active_plugins: HashSet<PluginInstallationId>,
602 pub deleted_regions: DeletedRegions,
603 pub component_version_for_replay: ComponentVersion,
606}
607
608impl<Context> bincode::Decode<Context> for WorkerStatusRecord {
609 fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
610 Ok(Self {
611 status: Decode::decode(decoder)?,
612 skipped_regions: Decode::decode(decoder)?,
613 overridden_retry_config: Decode::decode(decoder)?,
614 pending_invocations: Decode::decode(decoder)?,
615 pending_updates: Decode::decode(decoder)?,
616 failed_updates: Decode::decode(decoder)?,
617 successful_updates: Decode::decode(decoder)?,
618 invocation_results: Decode::decode(decoder)?,
619 current_idempotency_key: Decode::decode(decoder)?,
620 component_version: Decode::decode(decoder)?,
621 component_size: Decode::decode(decoder)?,
622 total_linear_memory_size: Decode::decode(decoder)?,
623 owned_resources: Decode::decode(decoder)?,
624 oplog_idx: Decode::decode(decoder)?,
625 active_plugins: Decode::decode(decoder)?,
626 deleted_regions: Decode::decode(decoder)?,
627 component_version_for_replay: Decode::decode(decoder)?,
628 })
629 }
630}
631impl<'de, Context> BorrowDecode<'de, Context> for WorkerStatusRecord {
632 fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
633 decoder: &mut D,
634 ) -> Result<Self, DecodeError> {
635 Ok(Self {
636 status: BorrowDecode::borrow_decode(decoder)?,
637 skipped_regions: BorrowDecode::borrow_decode(decoder)?,
638 overridden_retry_config: BorrowDecode::borrow_decode(decoder)?,
639 pending_invocations: BorrowDecode::borrow_decode(decoder)?,
640 pending_updates: BorrowDecode::borrow_decode(decoder)?,
641 failed_updates: BorrowDecode::borrow_decode(decoder)?,
642 successful_updates: BorrowDecode::borrow_decode(decoder)?,
643 invocation_results: BorrowDecode::borrow_decode(decoder)?,
644 current_idempotency_key: BorrowDecode::borrow_decode(decoder)?,
645 component_version: BorrowDecode::borrow_decode(decoder)?,
646 component_size: BorrowDecode::borrow_decode(decoder)?,
647 total_linear_memory_size: BorrowDecode::borrow_decode(decoder)?,
648 owned_resources: BorrowDecode::borrow_decode(decoder)?,
649 oplog_idx: BorrowDecode::borrow_decode(decoder)?,
650 active_plugins: BorrowDecode::borrow_decode(decoder)?,
651 deleted_regions: BorrowDecode::borrow_decode(decoder)?,
652 component_version_for_replay: BorrowDecode::borrow_decode(decoder)?,
653 })
654 }
655}
656
657impl Default for WorkerStatusRecord {
658 fn default() -> Self {
659 WorkerStatusRecord {
660 status: WorkerStatus::Idle,
661 skipped_regions: DeletedRegions::new(),
662 overridden_retry_config: None,
663 pending_invocations: Vec::new(),
664 pending_updates: VecDeque::new(),
665 failed_updates: Vec::new(),
666 successful_updates: Vec::new(),
667 invocation_results: HashMap::new(),
668 current_idempotency_key: None,
669 component_version: 0,
670 component_size: 0,
671 total_linear_memory_size: 0,
672 owned_resources: HashMap::new(),
673 oplog_idx: OplogIndex::default(),
674 active_plugins: HashSet::new(),
675 deleted_regions: DeletedRegions::new(),
676 component_version_for_replay: 0,
677 }
678 }
679}
680
681#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
682pub struct FailedUpdateRecord {
683 pub timestamp: Timestamp,
684 pub target_version: ComponentVersion,
685 pub details: Option<String>,
686}
687
688#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
689pub struct SuccessfulUpdateRecord {
690 pub timestamp: Timestamp,
691 pub target_version: ComponentVersion,
692}
693
694#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode, IntoValue)]
699#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
700pub enum WorkerStatus {
701 Running,
703 Idle,
705 Suspended,
707 Interrupted,
709 Retrying,
711 Failed,
713 Exited,
715}
716
717impl PartialOrd for WorkerStatus {
718 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
719 Some(self.cmp(other))
720 }
721}
722
723impl Ord for WorkerStatus {
724 fn cmp(&self, other: &Self) -> Ordering {
725 let v1: i32 = self.clone().into();
726 let v2: i32 = other.clone().into();
727 v1.cmp(&v2)
728 }
729}
730
731impl FromStr for WorkerStatus {
732 type Err = String;
733
734 fn from_str(s: &str) -> Result<Self, Self::Err> {
735 match s.to_lowercase().as_str() {
736 "running" => Ok(WorkerStatus::Running),
737 "idle" => Ok(WorkerStatus::Idle),
738 "suspended" => Ok(WorkerStatus::Suspended),
739 "interrupted" => Ok(WorkerStatus::Interrupted),
740 "retrying" => Ok(WorkerStatus::Retrying),
741 "failed" => Ok(WorkerStatus::Failed),
742 "exited" => Ok(WorkerStatus::Exited),
743 _ => Err(format!("Unknown worker status: {s}")),
744 }
745 }
746}
747
748impl Display for WorkerStatus {
749 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
750 match self {
751 WorkerStatus::Running => write!(f, "Running"),
752 WorkerStatus::Idle => write!(f, "Idle"),
753 WorkerStatus::Suspended => write!(f, "Suspended"),
754 WorkerStatus::Interrupted => write!(f, "Interrupted"),
755 WorkerStatus::Retrying => write!(f, "Retrying"),
756 WorkerStatus::Failed => write!(f, "Failed"),
757 WorkerStatus::Exited => write!(f, "Exited"),
758 }
759 }
760}
761
762impl TryFrom<i32> for WorkerStatus {
763 type Error = String;
764
765 fn try_from(value: i32) -> Result<Self, Self::Error> {
766 match value {
767 0 => Ok(WorkerStatus::Running),
768 1 => Ok(WorkerStatus::Idle),
769 2 => Ok(WorkerStatus::Suspended),
770 3 => Ok(WorkerStatus::Interrupted),
771 4 => Ok(WorkerStatus::Retrying),
772 5 => Ok(WorkerStatus::Failed),
773 6 => Ok(WorkerStatus::Exited),
774 _ => Err(format!("Unknown worker status: {value}")),
775 }
776 }
777}
778
779impl From<WorkerStatus> for i32 {
780 fn from(value: WorkerStatus) -> Self {
781 match value {
782 WorkerStatus::Running => 0,
783 WorkerStatus::Idle => 1,
784 WorkerStatus::Suspended => 2,
785 WorkerStatus::Interrupted => 3,
786 WorkerStatus::Retrying => 4,
787 WorkerStatus::Failed => 5,
788 WorkerStatus::Exited => 6,
789 }
790 }
791}
792
793#[derive(Clone, Debug, PartialEq, Encode, Decode)]
796enum SerializedWorkerInvocation {
797 ExportedFunctionV1 {
798 idempotency_key: IdempotencyKey,
799 full_function_name: String,
800 function_input: Vec<Value>,
801 },
802 ManualUpdate {
803 target_version: ComponentVersion,
804 },
805 ExportedFunction {
806 idempotency_key: IdempotencyKey,
807 full_function_name: String,
808 function_input: Vec<Value>,
809 invocation_context: InvocationContextStack,
810 },
811}
812
813impl From<WorkerInvocation> for SerializedWorkerInvocation {
814 fn from(value: WorkerInvocation) -> Self {
815 match value {
816 WorkerInvocation::ManualUpdate { target_version } => {
817 Self::ManualUpdate { target_version }
818 }
819 WorkerInvocation::ExportedFunction {
820 idempotency_key,
821 full_function_name,
822 function_input,
823 invocation_context,
824 } => Self::ExportedFunction {
825 idempotency_key,
826 full_function_name,
827 function_input,
828 invocation_context,
829 },
830 }
831 }
832}
833
834impl From<SerializedWorkerInvocation> for WorkerInvocation {
835 fn from(value: SerializedWorkerInvocation) -> Self {
836 match value {
837 SerializedWorkerInvocation::ExportedFunctionV1 {
838 idempotency_key,
839 full_function_name,
840 function_input,
841 } => Self::ExportedFunction {
842 idempotency_key,
843 full_function_name,
844 function_input,
845 invocation_context: InvocationContextStack::fresh(),
846 },
847 SerializedWorkerInvocation::ManualUpdate { target_version } => {
848 Self::ManualUpdate { target_version }
849 }
850 SerializedWorkerInvocation::ExportedFunction {
851 idempotency_key,
852 full_function_name,
853 function_input,
854 invocation_context,
855 } => Self::ExportedFunction {
856 idempotency_key,
857 full_function_name,
858 function_input,
859 invocation_context,
860 },
861 }
862 }
863}
864
865#[derive(Clone, Debug, PartialEq)]
866pub enum WorkerInvocation {
867 ManualUpdate {
868 target_version: ComponentVersion,
869 },
870 ExportedFunction {
871 idempotency_key: IdempotencyKey,
872 full_function_name: String,
873 function_input: Vec<Value>,
874 invocation_context: InvocationContextStack,
875 },
876}
877
878impl WorkerInvocation {
879 pub fn is_idempotency_key(&self, key: &IdempotencyKey) -> bool {
880 match self {
881 Self::ExportedFunction {
882 idempotency_key, ..
883 } => idempotency_key == key,
884 _ => false,
885 }
886 }
887
888 pub fn idempotency_key(&self) -> Option<&IdempotencyKey> {
889 match self {
890 Self::ExportedFunction {
891 idempotency_key, ..
892 } => Some(idempotency_key),
893 _ => None,
894 }
895 }
896
897 pub fn invocation_context(&self) -> InvocationContextStack {
898 match self {
899 Self::ExportedFunction {
900 invocation_context, ..
901 } => invocation_context.clone(),
902 _ => InvocationContextStack::fresh(),
903 }
904 }
905}
906
907impl Encode for WorkerInvocation {
908 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
909 let serialized: SerializedWorkerInvocation = self.clone().into();
910 serialized.encode(encoder)
911 }
912}
913
914impl<Context> Decode<Context> for WorkerInvocation {
915 fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
916 let serialized: SerializedWorkerInvocation = Decode::decode(decoder)?;
917 Ok(serialized.into())
918 }
919}
920
921impl<'de, Context> BorrowDecode<'de, Context> for WorkerInvocation {
922 fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
923 decoder: &mut D,
924 ) -> Result<Self, DecodeError> {
925 let serialized: SerializedWorkerInvocation = BorrowDecode::borrow_decode(decoder)?;
926 Ok(serialized.into())
927 }
928}
929
930#[derive(Clone, Debug, PartialEq, Encode, Decode)]
931pub struct TimestampedWorkerInvocation {
932 pub timestamp: Timestamp,
933 pub invocation: WorkerInvocation,
934}
935
936#[derive(
937 Clone,
938 Debug,
939 PartialOrd,
940 Ord,
941 derive_more::FromStr,
942 Eq,
943 Hash,
944 PartialEq,
945 Serialize,
946 Deserialize,
947 Encode,
948 Decode,
949 IntoValue,
950)]
951#[serde(transparent)]
952pub struct AccountId {
953 pub value: String,
954}
955
956impl AccountId {
957 pub fn generate() -> Self {
958 Self {
959 value: Uuid::new_v4().to_string(),
960 }
961 }
962}
963
964impl From<&str> for AccountId {
965 fn from(value: &str) -> Self {
966 Self {
967 value: value.to_string(),
968 }
969 }
970}
971
972impl Display for AccountId {
973 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
974 write!(f, "{}", &self.value)
975 }
976}
977
978#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
979#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
980#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
981#[serde(rename_all = "camelCase")]
982pub struct WorkerNameFilter {
983 pub comparator: StringFilterComparator,
984 pub value: String,
985}
986
987impl WorkerNameFilter {
988 pub fn new(comparator: StringFilterComparator, value: String) -> Self {
989 Self { comparator, value }
990 }
991}
992
993impl Display for WorkerNameFilter {
994 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
995 write!(f, "name {} {}", self.comparator, self.value)
996 }
997}
998
999#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1000#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1001#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1002#[serde(rename_all = "camelCase")]
1003pub struct WorkerStatusFilter {
1004 pub comparator: FilterComparator,
1005 pub value: WorkerStatus,
1006}
1007
1008impl WorkerStatusFilter {
1009 pub fn new(comparator: FilterComparator, value: WorkerStatus) -> Self {
1010 Self { comparator, value }
1011 }
1012}
1013
1014impl Display for WorkerStatusFilter {
1015 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1016 write!(f, "status == {:?}", self.value)
1017 }
1018}
1019
1020#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1021#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1022#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1023#[serde(rename_all = "camelCase")]
1024pub struct WorkerVersionFilter {
1025 pub comparator: FilterComparator,
1026 pub value: ComponentVersion,
1027}
1028
1029impl WorkerVersionFilter {
1030 pub fn new(comparator: FilterComparator, value: ComponentVersion) -> Self {
1031 Self { comparator, value }
1032 }
1033}
1034
1035impl Display for WorkerVersionFilter {
1036 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1037 write!(f, "version {} {}", self.comparator, self.value)
1038 }
1039}
1040
1041#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1042#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1043#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1044#[serde(rename_all = "camelCase")]
1045pub struct WorkerCreatedAtFilter {
1046 pub comparator: FilterComparator,
1047 pub value: Timestamp,
1048}
1049
1050impl WorkerCreatedAtFilter {
1051 pub fn new(comparator: FilterComparator, value: Timestamp) -> Self {
1052 Self { comparator, value }
1053 }
1054}
1055
1056impl Display for WorkerCreatedAtFilter {
1057 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1058 write!(f, "created_at {} {}", self.comparator, self.value)
1059 }
1060}
1061
1062#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1063#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1064#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1065#[serde(rename_all = "camelCase")]
1066pub struct WorkerEnvFilter {
1067 pub name: String,
1068 pub comparator: StringFilterComparator,
1069 pub value: String,
1070}
1071
1072impl WorkerEnvFilter {
1073 pub fn new(name: String, comparator: StringFilterComparator, value: String) -> Self {
1074 Self {
1075 name,
1076 comparator,
1077 value,
1078 }
1079 }
1080}
1081
1082impl Display for WorkerEnvFilter {
1083 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1084 write!(f, "env.{} {} {}", self.name, self.comparator, self.value)
1085 }
1086}
1087
1088#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1089#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1090#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1091#[serde(rename_all = "camelCase")]
1092pub struct WorkerAndFilter {
1093 pub filters: Vec<WorkerFilter>,
1094}
1095
1096impl WorkerAndFilter {
1097 pub fn new(filters: Vec<WorkerFilter>) -> Self {
1098 Self { filters }
1099 }
1100}
1101
1102impl Display for WorkerAndFilter {
1103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104 write!(
1105 f,
1106 "({})",
1107 self.filters
1108 .iter()
1109 .map(|f| f.clone().to_string())
1110 .collect::<Vec<String>>()
1111 .join(" AND ")
1112 )
1113 }
1114}
1115
1116#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1117#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1118#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1119#[serde(rename_all = "camelCase")]
1120pub struct WorkerOrFilter {
1121 pub filters: Vec<WorkerFilter>,
1122}
1123
1124impl WorkerOrFilter {
1125 pub fn new(filters: Vec<WorkerFilter>) -> Self {
1126 Self { filters }
1127 }
1128}
1129
1130impl Display for WorkerOrFilter {
1131 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1132 write!(
1133 f,
1134 "({})",
1135 self.filters
1136 .iter()
1137 .map(|f| f.clone().to_string())
1138 .collect::<Vec<String>>()
1139 .join(" OR ")
1140 )
1141 }
1142}
1143
1144#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1145#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1146#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1147#[serde(rename_all = "camelCase")]
1148pub struct WorkerNotFilter {
1149 filter: Box<WorkerFilter>,
1150}
1151
1152impl WorkerNotFilter {
1153 pub fn new(filter: WorkerFilter) -> Self {
1154 Self {
1155 filter: Box::new(filter),
1156 }
1157 }
1158}
1159
1160impl Display for WorkerNotFilter {
1161 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1162 write!(f, "NOT ({})", self.filter)
1163 }
1164}
1165
1166#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1167#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
1168#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
1169#[serde(tag = "type")]
1170pub enum WorkerFilter {
1171 Name(WorkerNameFilter),
1172 Status(WorkerStatusFilter),
1173 Version(WorkerVersionFilter),
1174 CreatedAt(WorkerCreatedAtFilter),
1175 Env(WorkerEnvFilter),
1176 And(WorkerAndFilter),
1177 Or(WorkerOrFilter),
1178 Not(WorkerNotFilter),
1179}
1180
1181impl WorkerFilter {
1182 pub fn and(&self, filter: WorkerFilter) -> Self {
1183 match self.clone() {
1184 WorkerFilter::And(WorkerAndFilter { filters }) => {
1185 Self::new_and([filters, vec![filter]].concat())
1186 }
1187 f => Self::new_and(vec![f, filter]),
1188 }
1189 }
1190
1191 pub fn or(&self, filter: WorkerFilter) -> Self {
1192 match self.clone() {
1193 WorkerFilter::Or(WorkerOrFilter { filters }) => {
1194 Self::new_or([filters, vec![filter]].concat())
1195 }
1196 f => Self::new_or(vec![f, filter]),
1197 }
1198 }
1199
1200 pub fn not(&self) -> Self {
1201 Self::new_not(self.clone())
1202 }
1203
1204 pub fn matches(&self, metadata: &WorkerMetadata) -> bool {
1205 match self.clone() {
1206 WorkerFilter::Name(WorkerNameFilter { comparator, value }) => {
1207 comparator.matches(&metadata.worker_id.worker_name, &value)
1208 }
1209 WorkerFilter::Version(WorkerVersionFilter { comparator, value }) => {
1210 let version: ComponentVersion = metadata.last_known_status.component_version;
1211 comparator.matches(&version, &value)
1212 }
1213 WorkerFilter::Env(WorkerEnvFilter {
1214 name,
1215 comparator,
1216 value,
1217 }) => {
1218 let mut result = false;
1219 let name = name.to_lowercase();
1220 for env_value in metadata.env.clone() {
1221 if env_value.0.to_lowercase() == name {
1222 result = comparator.matches(&env_value.1, &value);
1223
1224 break;
1225 }
1226 }
1227 result
1228 }
1229 WorkerFilter::CreatedAt(WorkerCreatedAtFilter { comparator, value }) => {
1230 comparator.matches(&metadata.created_at, &value)
1231 }
1232 WorkerFilter::Status(WorkerStatusFilter { comparator, value }) => {
1233 comparator.matches(&metadata.last_known_status.status, &value)
1234 }
1235 WorkerFilter::Not(WorkerNotFilter { filter }) => !filter.matches(metadata),
1236 WorkerFilter::And(WorkerAndFilter { filters }) => {
1237 let mut result = true;
1238 for filter in filters {
1239 if !filter.matches(metadata) {
1240 result = false;
1241 break;
1242 }
1243 }
1244 result
1245 }
1246 WorkerFilter::Or(WorkerOrFilter { filters }) => {
1247 let mut result = true;
1248 if !filters.is_empty() {
1249 result = false;
1250 for filter in filters {
1251 if filter.matches(metadata) {
1252 result = true;
1253 break;
1254 }
1255 }
1256 }
1257 result
1258 }
1259 }
1260 }
1261
1262 pub fn new_and(filters: Vec<WorkerFilter>) -> Self {
1263 WorkerFilter::And(WorkerAndFilter::new(filters))
1264 }
1265
1266 pub fn new_or(filters: Vec<WorkerFilter>) -> Self {
1267 WorkerFilter::Or(WorkerOrFilter::new(filters))
1268 }
1269
1270 pub fn new_not(filter: WorkerFilter) -> Self {
1271 WorkerFilter::Not(WorkerNotFilter::new(filter))
1272 }
1273
1274 pub fn new_name(comparator: StringFilterComparator, value: String) -> Self {
1275 WorkerFilter::Name(WorkerNameFilter::new(comparator, value))
1276 }
1277
1278 pub fn new_env(name: String, comparator: StringFilterComparator, value: String) -> Self {
1279 WorkerFilter::Env(WorkerEnvFilter::new(name, comparator, value))
1280 }
1281
1282 pub fn new_version(comparator: FilterComparator, value: ComponentVersion) -> Self {
1283 WorkerFilter::Version(WorkerVersionFilter::new(comparator, value))
1284 }
1285
1286 pub fn new_status(comparator: FilterComparator, value: WorkerStatus) -> Self {
1287 WorkerFilter::Status(WorkerStatusFilter::new(comparator, value))
1288 }
1289
1290 pub fn new_created_at(comparator: FilterComparator, value: Timestamp) -> Self {
1291 WorkerFilter::CreatedAt(WorkerCreatedAtFilter::new(comparator, value))
1292 }
1293
1294 pub fn from(filters: Vec<String>) -> Result<WorkerFilter, String> {
1295 let mut fs = Vec::new();
1296 for f in filters {
1297 fs.push(WorkerFilter::from_str(&f)?);
1298 }
1299 Ok(WorkerFilter::new_and(fs))
1300 }
1301}
1302
1303impl Display for WorkerFilter {
1304 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1305 match self {
1306 WorkerFilter::Name(filter) => {
1307 write!(f, "{filter}")
1308 }
1309 WorkerFilter::Version(filter) => {
1310 write!(f, "{filter}")
1311 }
1312 WorkerFilter::Status(filter) => {
1313 write!(f, "{filter}")
1314 }
1315 WorkerFilter::CreatedAt(filter) => {
1316 write!(f, "{filter}")
1317 }
1318 WorkerFilter::Env(filter) => {
1319 write!(f, "{filter}")
1320 }
1321 WorkerFilter::Not(filter) => {
1322 write!(f, "{filter}")
1323 }
1324 WorkerFilter::And(filter) => {
1325 write!(f, "{filter}")
1326 }
1327 WorkerFilter::Or(filter) => {
1328 write!(f, "{filter}")
1329 }
1330 }
1331 }
1332}
1333
1334impl FromStr for WorkerFilter {
1335 type Err = String;
1336
1337 fn from_str(s: &str) -> Result<Self, Self::Err> {
1338 let elements = s.split_whitespace().collect::<Vec<&str>>();
1339
1340 if elements.len() == 3 {
1341 let arg = elements[0];
1342 let comparator = elements[1];
1343 let value = elements[2];
1344 match arg {
1345 "name" => Ok(WorkerFilter::new_name(
1346 comparator.parse()?,
1347 value.to_string(),
1348 )),
1349 "version" => Ok(WorkerFilter::new_version(
1350 comparator.parse()?,
1351 value
1352 .parse()
1353 .map_err(|e| format!("Invalid filter value: {e}"))?,
1354 )),
1355 "status" => Ok(WorkerFilter::new_status(
1356 comparator.parse()?,
1357 value.parse()?,
1358 )),
1359 "created_at" | "createdAt" => Ok(WorkerFilter::new_created_at(
1360 comparator.parse()?,
1361 value.parse()?,
1362 )),
1363 _ if arg.starts_with("env.") => {
1364 let name = &arg[4..];
1365 Ok(WorkerFilter::new_env(
1366 name.to_string(),
1367 comparator.parse()?,
1368 value.to_string(),
1369 ))
1370 }
1371 _ => Err(format!("Invalid filter: {s}")),
1372 }
1373 } else {
1374 Err(format!("Invalid filter: {s}"))
1375 }
1376 }
1377}
1378
1379#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1380#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1381pub enum StringFilterComparator {
1382 Equal,
1383 NotEqual,
1384 Like,
1385 NotLike,
1386}
1387
1388impl StringFilterComparator {
1389 pub fn matches<T: Display>(&self, value1: &T, value2: &T) -> bool {
1390 match self {
1391 StringFilterComparator::Equal => value1.to_string() == value2.to_string(),
1392 StringFilterComparator::NotEqual => value1.to_string() != value2.to_string(),
1393 StringFilterComparator::Like => {
1394 value1.to_string().contains(value2.to_string().as_str())
1395 }
1396 StringFilterComparator::NotLike => {
1397 !value1.to_string().contains(value2.to_string().as_str())
1398 }
1399 }
1400 }
1401}
1402
1403impl FromStr for StringFilterComparator {
1404 type Err = String;
1405
1406 fn from_str(s: &str) -> Result<Self, Self::Err> {
1407 match s.to_lowercase().as_str() {
1408 "==" | "=" | "equal" | "eq" => Ok(StringFilterComparator::Equal),
1409 "!=" | "notequal" | "ne" => Ok(StringFilterComparator::NotEqual),
1410 "like" => Ok(StringFilterComparator::Like),
1411 "notlike" => Ok(StringFilterComparator::NotLike),
1412 _ => Err(format!("Unknown String Filter Comparator: {s}")),
1413 }
1414 }
1415}
1416
1417impl TryFrom<i32> for StringFilterComparator {
1418 type Error = String;
1419
1420 fn try_from(value: i32) -> Result<Self, Self::Error> {
1421 match value {
1422 0 => Ok(StringFilterComparator::Equal),
1423 1 => Ok(StringFilterComparator::NotEqual),
1424 2 => Ok(StringFilterComparator::Like),
1425 3 => Ok(StringFilterComparator::NotLike),
1426 _ => Err(format!("Unknown String Filter Comparator: {value}")),
1427 }
1428 }
1429}
1430
1431impl From<StringFilterComparator> for i32 {
1432 fn from(value: StringFilterComparator) -> Self {
1433 match value {
1434 StringFilterComparator::Equal => 0,
1435 StringFilterComparator::NotEqual => 1,
1436 StringFilterComparator::Like => 2,
1437 StringFilterComparator::NotLike => 3,
1438 }
1439 }
1440}
1441
1442impl Display for StringFilterComparator {
1443 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1444 let s = match self {
1445 StringFilterComparator::Equal => "==",
1446 StringFilterComparator::NotEqual => "!=",
1447 StringFilterComparator::Like => "like",
1448 StringFilterComparator::NotLike => "notlike",
1449 };
1450 write!(f, "{s}")
1451 }
1452}
1453
1454#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1455#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1456pub enum FilterComparator {
1457 Equal,
1458 NotEqual,
1459 GreaterEqual,
1460 Greater,
1461 LessEqual,
1462 Less,
1463}
1464
1465impl Display for FilterComparator {
1466 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1467 let s = match self {
1468 FilterComparator::Equal => "==",
1469 FilterComparator::NotEqual => "!=",
1470 FilterComparator::GreaterEqual => ">=",
1471 FilterComparator::Greater => ">",
1472 FilterComparator::LessEqual => "<=",
1473 FilterComparator::Less => "<",
1474 };
1475 write!(f, "{s}")
1476 }
1477}
1478
1479impl FilterComparator {
1480 pub fn matches<T: Ord>(&self, value1: &T, value2: &T) -> bool {
1481 match self {
1482 FilterComparator::Equal => value1 == value2,
1483 FilterComparator::NotEqual => value1 != value2,
1484 FilterComparator::Less => value1 < value2,
1485 FilterComparator::LessEqual => value1 <= value2,
1486 FilterComparator::Greater => value1 > value2,
1487 FilterComparator::GreaterEqual => value1 >= value2,
1488 }
1489 }
1490}
1491
1492impl FromStr for FilterComparator {
1493 type Err = String;
1494 fn from_str(s: &str) -> Result<Self, Self::Err> {
1495 match s.to_lowercase().as_str() {
1496 "==" | "=" | "equal" | "eq" => Ok(FilterComparator::Equal),
1497 "!=" | "notequal" | "ne" => Ok(FilterComparator::NotEqual),
1498 ">=" | "greaterequal" | "ge" => Ok(FilterComparator::GreaterEqual),
1499 ">" | "greater" | "gt" => Ok(FilterComparator::Greater),
1500 "<=" | "lessequal" | "le" => Ok(FilterComparator::LessEqual),
1501 "<" | "less" | "lt" => Ok(FilterComparator::Less),
1502 _ => Err(format!("Unknown Filter Comparator: {s}")),
1503 }
1504 }
1505}
1506
1507impl TryFrom<i32> for FilterComparator {
1508 type Error = String;
1509
1510 fn try_from(value: i32) -> Result<Self, Self::Error> {
1511 match value {
1512 0 => Ok(FilterComparator::Equal),
1513 1 => Ok(FilterComparator::NotEqual),
1514 2 => Ok(FilterComparator::Less),
1515 3 => Ok(FilterComparator::LessEqual),
1516 4 => Ok(FilterComparator::Greater),
1517 5 => Ok(FilterComparator::GreaterEqual),
1518 _ => Err(format!("Unknown Filter Comparator: {value}")),
1519 }
1520 }
1521}
1522
1523impl From<FilterComparator> for i32 {
1524 fn from(value: FilterComparator) -> Self {
1525 match value {
1526 FilterComparator::Equal => 0,
1527 FilterComparator::NotEqual => 1,
1528 FilterComparator::Less => 2,
1529 FilterComparator::LessEqual => 3,
1530 FilterComparator::Greater => 4,
1531 FilterComparator::GreaterEqual => 5,
1532 }
1533 }
1534}
1535
1536#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, Default)]
1537#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1538#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1539#[serde(rename_all = "camelCase")]
1540pub struct ScanCursor {
1541 pub cursor: u64,
1542 pub layer: usize,
1543}
1544
1545impl ScanCursor {
1546 pub fn is_active_layer_finished(&self) -> bool {
1547 self.cursor == 0
1548 }
1549
1550 pub fn is_finished(&self) -> bool {
1551 self.cursor == 0 && self.layer == 0
1552 }
1553
1554 pub fn into_option(self) -> Option<Self> {
1555 if self.is_finished() {
1556 None
1557 } else {
1558 Some(self)
1559 }
1560 }
1561}
1562
1563impl Display for ScanCursor {
1564 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1565 write!(f, "{}/{}", self.layer, self.cursor)
1566 }
1567}
1568
1569impl FromStr for ScanCursor {
1570 type Err = String;
1571
1572 fn from_str(s: &str) -> Result<Self, Self::Err> {
1573 let parts = s.split('/').collect::<Vec<&str>>();
1574 if parts.len() == 2 {
1575 Ok(ScanCursor {
1576 layer: parts[0]
1577 .parse()
1578 .map_err(|e| format!("Invalid layer part: {e}"))?,
1579 cursor: parts[1]
1580 .parse()
1581 .map_err(|e| format!("Invalid cursor part: {e}"))?,
1582 })
1583 } else {
1584 Err("Invalid cursor, must have 'layer/cursor' format".to_string())
1585 }
1586 }
1587}
1588
1589#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
1590pub enum LogLevel {
1591 Trace,
1592 Debug,
1593 Info,
1594 Warn,
1595 Error,
1596 Critical,
1597}
1598
1599#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1600pub enum WorkerEvent {
1601 StdOut {
1602 timestamp: Timestamp,
1603 bytes: Vec<u8>,
1604 },
1605 StdErr {
1606 timestamp: Timestamp,
1607 bytes: Vec<u8>,
1608 },
1609 Log {
1610 timestamp: Timestamp,
1611 level: LogLevel,
1612 context: String,
1613 message: String,
1614 },
1615 InvocationStart {
1616 timestamp: Timestamp,
1617 function: String,
1618 idempotency_key: IdempotencyKey,
1619 },
1620 InvocationFinished {
1621 timestamp: Timestamp,
1622 function: String,
1623 idempotency_key: IdempotencyKey,
1624 },
1625 ClientLagged { number_of_missed_messages: u64 },
1628}
1629
1630impl Display for WorkerEvent {
1631 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1632 match self {
1633 WorkerEvent::StdOut { bytes, .. } => {
1634 write!(
1635 f,
1636 "<stdout> {}",
1637 String::from_utf8(bytes.clone()).unwrap_or_default()
1638 )
1639 }
1640 WorkerEvent::StdErr { bytes, .. } => {
1641 write!(
1642 f,
1643 "<stderr> {}",
1644 String::from_utf8(bytes.clone()).unwrap_or_default()
1645 )
1646 }
1647 WorkerEvent::Log {
1648 level,
1649 context,
1650 message,
1651 ..
1652 } => {
1653 write!(f, "<log> {level:?} {context} {message}")
1654 }
1655 WorkerEvent::InvocationStart {
1656 function,
1657 idempotency_key,
1658 ..
1659 } => {
1660 write!(f, "<invocation-start> {function} {idempotency_key}")
1661 }
1662 WorkerEvent::InvocationFinished {
1663 function,
1664 idempotency_key,
1665 ..
1666 } => {
1667 write!(f, "<invocation-finished> {function} {idempotency_key}")
1668 }
1669 WorkerEvent::ClientLagged {
1670 number_of_missed_messages,
1671 } => {
1672 write!(f, "<client-lagged> {number_of_missed_messages}")
1673 }
1674 }
1675 }
1676}
1677
1678#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
1679#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1680#[repr(i32)]
1681pub enum ComponentType {
1682 Durable = 0,
1683 Ephemeral = 1,
1684}
1685
1686impl TryFrom<i32> for ComponentType {
1687 type Error = String;
1688
1689 fn try_from(value: i32) -> Result<Self, Self::Error> {
1690 match value {
1691 0 => Ok(ComponentType::Durable),
1692 1 => Ok(ComponentType::Ephemeral),
1693 _ => Err(format!("Unknown Component Type: {value}")),
1694 }
1695 }
1696}
1697
1698impl Display for ComponentType {
1699 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1700 let s = match self {
1701 ComponentType::Durable => "Durable",
1702 ComponentType::Ephemeral => "Ephemeral",
1703 };
1704 write!(f, "{s}")
1705 }
1706}
1707
1708impl FromStr for ComponentType {
1709 type Err = String;
1710
1711 fn from_str(s: &str) -> Result<Self, Self::Err> {
1712 match s {
1713 "Durable" => Ok(ComponentType::Durable),
1714 "Ephemeral" => Ok(ComponentType::Ephemeral),
1715 _ => Err(format!("Unknown Component Type: {s}")),
1716 }
1717 }
1718}
1719
1720#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1721#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1722#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1723#[serde(rename_all = "camelCase")]
1724pub struct Empty {}
1725
1726#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1729#[cfg_attr(feature = "poem", derive(poem_openapi::NewType))]
1730pub struct InitialComponentFileKey(pub String);
1731
1732impl Display for InitialComponentFileKey {
1733 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1734 self.0.fmt(f)
1735 }
1736}
1737
1738#[derive(Clone, Debug, Eq, PartialEq, Hash)]
1744pub struct ComponentFilePath(Utf8UnixPathBuf);
1745
1746impl ComponentFilePath {
1747 pub fn from_abs_str(s: &str) -> Result<Self, String> {
1748 let buf: Utf8UnixPathBuf = s.into();
1749 if !buf.is_absolute() {
1750 return Err("Path must be absolute".to_string());
1751 }
1752
1753 Ok(ComponentFilePath(buf.normalize()))
1754 }
1755
1756 pub fn from_rel_str(s: &str) -> Result<Self, String> {
1757 Self::from_abs_str(&format!("/{s}"))
1758 }
1759
1760 pub fn from_either_str(s: &str) -> Result<Self, String> {
1761 if s.starts_with('/') {
1762 Self::from_abs_str(s)
1763 } else {
1764 Self::from_rel_str(s)
1765 }
1766 }
1767
1768 pub fn as_path(&self) -> &Utf8UnixPathBuf {
1769 &self.0
1770 }
1771
1772 pub fn to_rel_string(&self) -> String {
1773 self.0.strip_prefix("/").unwrap().to_string()
1774 }
1775
1776 pub fn extend(&mut self, path: &str) -> Result<(), String> {
1777 self.0.push_checked(path).map_err(|e| e.to_string())
1778 }
1779}
1780
1781impl Display for ComponentFilePath {
1782 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1783 self.0.fmt(f)
1784 }
1785}
1786
1787impl Serialize for ComponentFilePath {
1788 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1789 where
1790 S: Serializer,
1791 {
1792 String::serialize(&self.to_string(), serializer)
1793 }
1794}
1795
1796impl<'de> Deserialize<'de> for ComponentFilePath {
1797 fn deserialize<D>(deserializer: D) -> Result<ComponentFilePath, D::Error>
1798 where
1799 D: Deserializer<'de>,
1800 {
1801 let str = String::deserialize(deserializer)?;
1802 Self::from_abs_str(&str).map_err(de::Error::custom)
1803 }
1804}
1805
1806impl TryFrom<&str> for ComponentFilePath {
1807 type Error = String;
1808 fn try_from(value: &str) -> Result<Self, Self::Error> {
1809 Self::from_either_str(value)
1810 }
1811}
1812
1813#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
1814#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1815#[serde(rename_all = "kebab-case")]
1816#[cfg_attr(feature = "poem", oai(rename_all = "kebab-case"))]
1817pub enum ComponentFilePermissions {
1818 ReadOnly,
1819 ReadWrite,
1820}
1821
1822impl ComponentFilePermissions {
1823 pub fn as_compact_str(&self) -> &'static str {
1824 match self {
1825 ComponentFilePermissions::ReadOnly => "ro",
1826 ComponentFilePermissions::ReadWrite => "rw",
1827 }
1828 }
1829 pub fn from_compact_str(s: &str) -> Result<Self, String> {
1830 match s {
1831 "ro" => Ok(ComponentFilePermissions::ReadOnly),
1832 "rw" => Ok(ComponentFilePermissions::ReadWrite),
1833 _ => Err(format!("Unknown permissions: {s}")),
1834 }
1835 }
1836}
1837
1838#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1839#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1840#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1841#[serde(rename_all = "camelCase")]
1842pub struct InitialComponentFile {
1843 pub key: InitialComponentFileKey,
1844 pub path: ComponentFilePath,
1845 pub permissions: ComponentFilePermissions,
1846}
1847
1848impl InitialComponentFile {
1849 pub fn is_read_only(&self) -> bool {
1850 self.permissions == ComponentFilePermissions::ReadOnly
1851 }
1852}
1853
1854#[derive(Clone, Debug, Serialize, Deserialize)]
1855#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1856#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1857#[serde(rename_all = "camelCase")]
1858pub struct ComponentFilePathWithPermissions {
1859 pub path: ComponentFilePath,
1860 pub permissions: ComponentFilePermissions,
1861}
1862
1863impl ComponentFilePathWithPermissions {
1864 pub fn extend_path(&mut self, path: &str) -> Result<(), String> {
1865 self.path.extend(path)
1866 }
1867}
1868
1869impl Display for ComponentFilePathWithPermissions {
1870 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1871 write!(f, "{}", serde_json::to_string(self).unwrap())
1872 }
1873}
1874
1875#[derive(Clone, Debug, Serialize, Deserialize)]
1876#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1877#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1878#[serde(rename_all = "camelCase")]
1879pub struct ComponentFilePathWithPermissionsList {
1880 pub values: Vec<ComponentFilePathWithPermissions>,
1881}
1882
1883impl Display for ComponentFilePathWithPermissionsList {
1884 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1885 write!(f, "{}", serde_json::to_string(self).unwrap())
1886 }
1887}
1888
1889#[derive(Clone, Debug, PartialEq)]
1890pub enum ComponentFileSystemNodeDetails {
1891 File {
1892 permissions: ComponentFilePermissions,
1893 size: u64,
1894 },
1895 Directory,
1896}
1897
1898#[derive(Clone, Debug, PartialEq)]
1899pub struct ComponentFileSystemNode {
1900 pub name: String,
1901 pub last_modified: SystemTime,
1902 pub details: ComponentFileSystemNodeDetails,
1903}
1904
1905#[derive(Debug, Clone, PartialEq, Serialize, Encode, Decode, Default)]
1906#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1907#[serde(rename_all = "kebab-case")]
1908#[cfg_attr(feature = "poem", oai(rename_all = "kebab-case"))]
1909pub enum GatewayBindingType {
1910 #[default]
1911 Default,
1912 FileServer,
1913 HttpHandler,
1914 CorsPreflight,
1915}
1916
1917impl<'de> Deserialize<'de> for GatewayBindingType {
1919 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1920 where
1921 D: Deserializer<'de>,
1922 {
1923 struct GatewayBindingTypeVisitor;
1924
1925 impl de::Visitor<'_> for GatewayBindingTypeVisitor {
1926 type Value = GatewayBindingType;
1927
1928 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
1929 formatter.write_str("a string representing the binding type")
1930 }
1931
1932 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
1933 where
1934 E: de::Error,
1935 {
1936 match value {
1937 "default" | "wit-worker" => Ok(GatewayBindingType::Default),
1938 "file-server" => Ok(GatewayBindingType::FileServer),
1939 "cors-preflight" => Ok(GatewayBindingType::CorsPreflight),
1940 _ => Err(de::Error::invalid_value(Unexpected::Str(value), &self)),
1941 }
1942 }
1943 }
1944
1945 deserializer.deserialize_str(GatewayBindingTypeVisitor)
1946 }
1947}
1948
1949impl TryFrom<String> for GatewayBindingType {
1950 type Error = String;
1951
1952 fn try_from(value: String) -> Result<Self, Self::Error> {
1953 match value.as_str() {
1954 "default" => Ok(GatewayBindingType::Default),
1955 "file-server" => Ok(GatewayBindingType::FileServer),
1956 _ => Err(format!("Invalid WorkerBindingType: {value}")),
1957 }
1958 }
1959}
1960
1961impl From<WorkerId> for golem_wasm_rpc::WorkerId {
1962 fn from(worker_id: WorkerId) -> Self {
1963 golem_wasm_rpc::WorkerId {
1964 component_id: worker_id.component_id.into(),
1965 worker_name: worker_id.worker_name,
1966 }
1967 }
1968}
1969
1970impl From<golem_wasm_rpc::WorkerId> for WorkerId {
1971 fn from(host: golem_wasm_rpc::WorkerId) -> Self {
1972 Self {
1973 component_id: host.component_id.into(),
1974 worker_name: host.worker_name,
1975 }
1976 }
1977}
1978
1979impl From<golem_wasm_rpc::ComponentId> for ComponentId {
1980 fn from(host: golem_wasm_rpc::ComponentId) -> Self {
1981 let high_bits = host.uuid.high_bits;
1982 let low_bits = host.uuid.low_bits;
1983
1984 Self(Uuid::from_u64_pair(high_bits, low_bits))
1985 }
1986}
1987
1988impl From<ComponentId> for golem_wasm_rpc::ComponentId {
1989 fn from(component_id: ComponentId) -> Self {
1990 let (high_bits, low_bits) = component_id.0.as_u64_pair();
1991
1992 golem_wasm_rpc::ComponentId {
1993 uuid: golem_wasm_rpc::Uuid {
1994 high_bits,
1995 low_bits,
1996 },
1997 }
1998 }
1999}
2000
2001#[cfg(test)]
2002mod tests {
2003 use std::collections::HashSet;
2004 use std::str::FromStr;
2005 use std::time::SystemTime;
2006 use std::vec;
2007 use test_r::test;
2008 use tracing::info;
2009
2010 use crate::model::oplog::OplogIndex;
2011
2012 use crate::model::{
2013 AccountId, ComponentFilePath, ComponentId, FilterComparator, IdempotencyKey, ProjectId,
2014 ShardId, StringFilterComparator, TargetWorkerId, Timestamp, WorkerFilter, WorkerId,
2015 WorkerMetadata, WorkerStatus, WorkerStatusRecord,
2016 };
2017 use bincode::{Decode, Encode};
2018
2019 use rand::{rng, Rng};
2020 use serde::{Deserialize, Serialize};
2021
2022 #[test]
2023 fn timestamp_conversion() {
2024 let ts: Timestamp = Timestamp::now_utc();
2025
2026 let prost_ts: prost_types::Timestamp = ts.into();
2027
2028 let ts2: Timestamp = prost_ts.into();
2029
2030 assert_eq!(ts2, ts);
2031 }
2032
2033 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
2034 struct ExampleWithAccountId {
2035 account_id: AccountId,
2036 }
2037
2038 #[test]
2039 fn account_id_from_json_apigateway_version() {
2040 let json = "{ \"account_id\": \"account-1\" }";
2041 let example: ExampleWithAccountId = serde_json::from_str(json).unwrap();
2042 assert_eq!(
2043 example.account_id,
2044 AccountId {
2045 value: "account-1".to_string()
2046 }
2047 );
2048 }
2049
2050 #[test]
2051 fn account_id_json_serialization() {
2052 let example: ExampleWithAccountId = ExampleWithAccountId {
2054 account_id: AccountId {
2055 value: "account-1".to_string(),
2056 },
2057 };
2058 let json = serde_json::to_string(&example).unwrap();
2059 assert_eq!(json, "{\"account_id\":\"account-1\"}");
2060 }
2061
2062 #[test]
2063 fn worker_filter_parse() {
2064 assert_eq!(
2065 WorkerFilter::from_str(" name = worker-1").unwrap(),
2066 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2067 );
2068
2069 assert_eq!(
2070 WorkerFilter::from_str("status == Running").unwrap(),
2071 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2072 );
2073
2074 assert_eq!(
2075 WorkerFilter::from_str("version >= 10").unwrap(),
2076 WorkerFilter::new_version(FilterComparator::GreaterEqual, 10)
2077 );
2078
2079 assert_eq!(
2080 WorkerFilter::from_str("env.tag1 == abc ").unwrap(),
2081 WorkerFilter::new_env(
2082 "tag1".to_string(),
2083 StringFilterComparator::Equal,
2084 "abc".to_string(),
2085 )
2086 );
2087 }
2088
2089 #[test]
2090 fn worker_filter_combination() {
2091 assert_eq!(
2092 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).not(),
2093 WorkerFilter::new_not(WorkerFilter::new_name(
2094 StringFilterComparator::Equal,
2095 "worker-1".to_string(),
2096 ))
2097 );
2098
2099 assert_eq!(
2100 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).and(
2101 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2102 ),
2103 WorkerFilter::new_and(vec![
2104 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2105 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2106 ])
2107 );
2108
2109 assert_eq!(
2110 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2111 .and(WorkerFilter::new_status(
2112 FilterComparator::Equal,
2113 WorkerStatus::Running,
2114 ))
2115 .and(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2116 WorkerFilter::new_and(vec![
2117 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2118 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2119 WorkerFilter::new_version(FilterComparator::Equal, 1),
2120 ])
2121 );
2122
2123 assert_eq!(
2124 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).or(
2125 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2126 ),
2127 WorkerFilter::new_or(vec![
2128 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2129 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2130 ])
2131 );
2132
2133 assert_eq!(
2134 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2135 .or(WorkerFilter::new_status(
2136 FilterComparator::NotEqual,
2137 WorkerStatus::Running,
2138 ))
2139 .or(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2140 WorkerFilter::new_or(vec![
2141 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2142 WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2143 WorkerFilter::new_version(FilterComparator::Equal, 1),
2144 ])
2145 );
2146
2147 assert_eq!(
2148 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2149 .and(WorkerFilter::new_status(
2150 FilterComparator::NotEqual,
2151 WorkerStatus::Running,
2152 ))
2153 .or(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2154 WorkerFilter::new_or(vec![
2155 WorkerFilter::new_and(vec![
2156 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2157 WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2158 ]),
2159 WorkerFilter::new_version(FilterComparator::Equal, 1),
2160 ])
2161 );
2162
2163 assert_eq!(
2164 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2165 .or(WorkerFilter::new_status(
2166 FilterComparator::NotEqual,
2167 WorkerStatus::Running,
2168 ))
2169 .and(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2170 WorkerFilter::new_and(vec![
2171 WorkerFilter::new_or(vec![
2172 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2173 WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2174 ]),
2175 WorkerFilter::new_version(FilterComparator::Equal, 1),
2176 ])
2177 );
2178 }
2179
2180 #[test]
2181 fn worker_filter_matches() {
2182 let component_id = ComponentId::new_v4();
2183 let worker_metadata = WorkerMetadata {
2184 worker_id: WorkerId {
2185 worker_name: "worker-1".to_string(),
2186 component_id,
2187 },
2188 args: vec![],
2189 env: vec![
2190 ("env1".to_string(), "value1".to_string()),
2191 ("env2".to_string(), "value2".to_string()),
2192 ],
2193 project_id: ProjectId::new_v4(),
2194 created_by: AccountId {
2195 value: "account-1".to_string(),
2196 },
2197 created_at: Timestamp::now_utc(),
2198 parent: None,
2199 last_known_status: WorkerStatusRecord {
2200 component_version: 1,
2201 ..WorkerStatusRecord::default()
2202 },
2203 };
2204
2205 assert!(
2206 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2207 .and(WorkerFilter::new_status(
2208 FilterComparator::Equal,
2209 WorkerStatus::Idle,
2210 ))
2211 .matches(&worker_metadata)
2212 );
2213
2214 assert!(WorkerFilter::new_env(
2215 "env1".to_string(),
2216 StringFilterComparator::Equal,
2217 "value1".to_string(),
2218 )
2219 .and(WorkerFilter::new_status(
2220 FilterComparator::Equal,
2221 WorkerStatus::Idle,
2222 ))
2223 .matches(&worker_metadata));
2224
2225 assert!(WorkerFilter::new_env(
2226 "env1".to_string(),
2227 StringFilterComparator::Equal,
2228 "value2".to_string(),
2229 )
2230 .not()
2231 .and(
2232 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running).or(
2233 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Idle)
2234 )
2235 )
2236 .matches(&worker_metadata));
2237
2238 assert!(
2239 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2240 .and(WorkerFilter::new_version(FilterComparator::Equal, 1))
2241 .matches(&worker_metadata)
2242 );
2243
2244 assert!(
2245 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-2".to_string())
2246 .or(WorkerFilter::new_version(FilterComparator::Equal, 1))
2247 .matches(&worker_metadata)
2248 );
2249
2250 assert!(WorkerFilter::new_version(FilterComparator::GreaterEqual, 1)
2251 .and(WorkerFilter::new_version(FilterComparator::Less, 2))
2252 .or(WorkerFilter::new_name(
2253 StringFilterComparator::Equal,
2254 "worker-2".to_string(),
2255 ))
2256 .matches(&worker_metadata));
2257 }
2258
2259 #[test]
2260 fn target_worker_id_force_shards() {
2261 let mut rng = rng();
2262 const SHARD_COUNT: usize = 1000;
2263 const EXAMPLE_COUNT: usize = 1000;
2264 for _ in 0..EXAMPLE_COUNT {
2265 let mut shard_ids = HashSet::new();
2266 let count = rng.random_range(0..100);
2267 for _ in 0..count {
2268 let shard_id = rng.random_range(0..SHARD_COUNT);
2269 shard_ids.insert(ShardId {
2270 value: shard_id as i64,
2271 });
2272 }
2273
2274 let component_id = ComponentId::new_v4();
2275 let target_worker_id = TargetWorkerId {
2276 component_id,
2277 worker_name: None,
2278 };
2279
2280 let start = SystemTime::now();
2281 let worker_id = target_worker_id.into_worker_id(&shard_ids, SHARD_COUNT);
2282 let end = SystemTime::now();
2283 info!(
2284 "Time with {count} valid shards: {:?}",
2285 end.duration_since(start).unwrap()
2286 );
2287
2288 if !shard_ids.is_empty() {
2289 assert!(shard_ids.contains(&ShardId::from_worker_id(&worker_id, SHARD_COUNT)));
2290 }
2291 }
2292 }
2293
2294 #[test]
2295 fn derived_idempotency_key() {
2296 let base1 = IdempotencyKey::fresh();
2297 let base2 = IdempotencyKey::fresh();
2298 let base3 = IdempotencyKey {
2299 value: "base3".to_string(),
2300 };
2301
2302 assert_ne!(base1, base2);
2303
2304 let idx1 = OplogIndex::from_u64(2);
2305 let idx2 = OplogIndex::from_u64(11);
2306
2307 let derived11a = IdempotencyKey::derived(&base1, idx1);
2308 let derived12a = IdempotencyKey::derived(&base1, idx2);
2309 let derived21a = IdempotencyKey::derived(&base2, idx1);
2310 let derived22a = IdempotencyKey::derived(&base2, idx2);
2311
2312 let derived11b = IdempotencyKey::derived(&base1, idx1);
2313 let derived12b = IdempotencyKey::derived(&base1, idx2);
2314 let derived21b = IdempotencyKey::derived(&base2, idx1);
2315 let derived22b = IdempotencyKey::derived(&base2, idx2);
2316
2317 let derived31 = IdempotencyKey::derived(&base3, idx1);
2318 let derived32 = IdempotencyKey::derived(&base3, idx2);
2319
2320 assert_eq!(derived11a, derived11b);
2321 assert_eq!(derived12a, derived12b);
2322 assert_eq!(derived21a, derived21b);
2323 assert_eq!(derived22a, derived22b);
2324
2325 assert_ne!(derived11a, derived12a);
2326 assert_ne!(derived11a, derived21a);
2327 assert_ne!(derived11a, derived22a);
2328 assert_ne!(derived12a, derived21a);
2329 assert_ne!(derived12a, derived22a);
2330 assert_ne!(derived21a, derived22a);
2331
2332 assert_ne!(derived11a, derived31);
2333 assert_ne!(derived21a, derived31);
2334 assert_ne!(derived12a, derived32);
2335 assert_ne!(derived22a, derived32);
2336 assert_ne!(derived31, derived32);
2337 }
2338
2339 #[test]
2340 fn initial_component_file_path_from_absolute() {
2341 let path = ComponentFilePath::from_abs_str("/a/b/c").unwrap();
2342 assert_eq!(path.to_string(), "/a/b/c");
2343 }
2344
2345 #[test]
2346 fn initial_component_file_path_from_relative() {
2347 let path = ComponentFilePath::from_abs_str("a/b/c");
2348 assert!(path.is_err());
2349 }
2350}