1use crate::model::oplog::{
16 IndexedResourceKey, OplogEntry, TimestampedUpdateDescription, WorkerResourceId,
17};
18use crate::model::regions::DeletedRegions;
19use bincode::de::{BorrowDecoder, Decoder};
20use bincode::enc::Encoder;
21use bincode::error::{DecodeError, EncodeError};
22use bincode::{BorrowDecode, Decode, Encode};
23
24pub use crate::base_model::*;
25use crate::model::invocation_context::InvocationContextStack;
26use golem_wasm_ast::analysis::analysed_type::{field, list, record, str, tuple, u32, u64};
27use golem_wasm_ast::analysis::AnalysedType;
28use golem_wasm_rpc::{IntoValue, Value};
29use golem_wasm_rpc_derive::IntoValue;
30use http::Uri;
31use rand::prelude::IteratorRandom;
32use serde::de::Unexpected;
33use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
34use std::cmp::Ordering;
35use std::collections::{HashMap, HashSet, VecDeque};
36use std::fmt::{Display, Formatter};
37use std::ops::Add;
38use std::str::FromStr;
39use std::time::{Duration, SystemTime};
40use typed_path::Utf8UnixPathBuf;
41use uuid::{uuid, Uuid};
42
43pub mod auth;
44pub mod base64;
45pub mod component;
46pub mod component_constraint;
47pub mod component_metadata;
48pub mod config;
49pub mod error;
50pub mod exports;
51pub mod invocation_context;
52pub mod lucene;
53pub mod oplog;
54pub mod plugin;
55pub mod project;
56pub mod public_oplog;
57pub mod regions;
58pub mod trim_date;
59
60#[cfg(feature = "poem")]
61mod poem;
62
63#[cfg(feature = "protobuf")]
64pub mod protobuf;
65
66#[cfg(feature = "poem")]
67pub trait PoemTypeRequirements:
68 poem_openapi::types::Type + poem_openapi::types::ParseFromJSON + poem_openapi::types::ToJSON
69{
70}
71
72#[cfg(not(feature = "poem"))]
73pub trait PoemTypeRequirements {}
74
75#[cfg(feature = "poem")]
76impl<
77 T: poem_openapi::types::Type
78 + poem_openapi::types::ParseFromJSON
79 + poem_openapi::types::ToJSON,
80 > PoemTypeRequirements for T
81{
82}
83
84#[cfg(not(feature = "poem"))]
85impl<T> PoemTypeRequirements for T {}
86
87#[cfg(feature = "poem")]
88pub trait PoemMultipartTypeRequirements: poem_openapi::types::ParseFromMultipartField {}
89
90#[cfg(not(feature = "poem"))]
91pub trait PoemMultipartTypeRequirements {}
92
93#[cfg(feature = "poem")]
94impl<T: poem_openapi::types::ParseFromMultipartField> PoemMultipartTypeRequirements for T {}
95
96#[cfg(not(feature = "poem"))]
97impl<T> PoemMultipartTypeRequirements for T {}
98
99#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
100#[repr(transparent)]
101pub struct Timestamp(iso8601_timestamp::Timestamp);
102
103impl Timestamp {
104 pub fn now_utc() -> Timestamp {
105 Timestamp(iso8601_timestamp::Timestamp::now_utc())
106 }
107
108 pub fn to_millis(&self) -> u64 {
109 self.0
110 .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH)
111 .whole_milliseconds() as u64
112 }
113}
114
115impl Display for Timestamp {
116 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
117 write!(f, "{}", self.0)
118 }
119}
120
121impl FromStr for Timestamp {
122 type Err = String;
123
124 fn from_str(s: &str) -> Result<Self, Self::Err> {
125 match iso8601_timestamp::Timestamp::parse(s) {
126 Some(ts) => Ok(Self(ts)),
127 None => Err("Invalid timestamp".to_string()),
128 }
129 }
130}
131
132impl serde::Serialize for Timestamp {
133 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
134 where
135 S: Serializer,
136 {
137 self.0.serialize(serializer)
138 }
139}
140
141impl<'de> serde::Deserialize<'de> for Timestamp {
142 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
143 where
144 D: Deserializer<'de>,
145 {
146 if deserializer.is_human_readable() {
147 iso8601_timestamp::Timestamp::deserialize(deserializer).map(Self)
148 } else {
149 let timestamp = i64::deserialize(deserializer)?;
151 Ok(Timestamp(
152 iso8601_timestamp::Timestamp::UNIX_EPOCH
153 .add(Duration::from_millis(timestamp as u64)),
154 ))
155 }
156 }
157}
158
159impl bincode::Encode for Timestamp {
160 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
161 (self
162 .0
163 .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH)
164 .whole_milliseconds() as i64)
165 .encode(encoder)
166 }
167}
168
169impl<Context> bincode::Decode<Context> for Timestamp {
170 fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
171 let timestamp: i64 = bincode::Decode::decode(decoder)?;
172 Ok(Timestamp(
173 iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(timestamp as u64)),
174 ))
175 }
176}
177
178impl<'de, Context> bincode::BorrowDecode<'de, Context> for Timestamp {
179 fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
180 decoder: &mut D,
181 ) -> Result<Self, DecodeError> {
182 let timestamp: i64 = bincode::BorrowDecode::borrow_decode(decoder)?;
183 Ok(Timestamp(
184 iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(timestamp as u64)),
185 ))
186 }
187}
188
189impl From<u64> for Timestamp {
190 fn from(value: u64) -> Self {
191 Timestamp(iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(value)))
192 }
193}
194
195impl IntoValue for Timestamp {
196 fn into_value(self) -> Value {
197 let d = self
198 .0
199 .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH);
200 Value::Record(vec![
201 Value::U64(d.whole_seconds() as u64),
202 Value::U32(d.subsec_nanoseconds() as u32),
203 ])
204 }
205
206 fn get_type() -> AnalysedType {
207 record(vec![field("seconds", u64()), field("nanoseconds", u32())])
208 }
209}
210
211#[derive(Clone, Debug, Eq, PartialEq, Hash, Encode, Decode)]
213pub struct OwnedWorkerId {
214 pub account_id: AccountId,
215 pub worker_id: WorkerId,
216}
217
218impl OwnedWorkerId {
219 pub fn new(account_id: &AccountId, worker_id: &WorkerId) -> Self {
220 Self {
221 account_id: account_id.clone(),
222 worker_id: worker_id.clone(),
223 }
224 }
225
226 pub fn worker_id(&self) -> WorkerId {
227 self.worker_id.clone()
228 }
229
230 pub fn account_id(&self) -> AccountId {
231 self.account_id.clone()
232 }
233
234 pub fn component_id(&self) -> ComponentId {
235 self.worker_id.component_id.clone()
236 }
237
238 pub fn worker_name(&self) -> String {
239 self.worker_id.worker_name.clone()
240 }
241}
242
243impl Display for OwnedWorkerId {
244 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
245 write!(f, "{}/{}", self.account_id, self.worker_id)
246 }
247}
248
249impl AsRef<WorkerId> for OwnedWorkerId {
250 fn as_ref(&self) -> &WorkerId {
251 &self.worker_id
252 }
253}
254
255#[derive(Debug, Clone, PartialEq, Encode, Decode)]
257pub enum ScheduledAction {
258 CompletePromise {
260 account_id: AccountId,
261 promise_id: PromiseId,
262 },
263 ArchiveOplog {
267 owned_worker_id: OwnedWorkerId,
268 last_oplog_index: OplogIndex,
269 next_after: Duration,
270 },
271 Invoke {
274 owned_worker_id: OwnedWorkerId,
275 idempotency_key: IdempotencyKey,
276 full_function_name: String,
277 function_input: Vec<Value>,
278 invocation_context: InvocationContextStack,
279 },
280}
281
282impl ScheduledAction {
283 pub fn owned_worker_id(&self) -> OwnedWorkerId {
284 match self {
285 ScheduledAction::CompletePromise {
286 account_id,
287 promise_id,
288 } => OwnedWorkerId::new(account_id, &promise_id.worker_id),
289 ScheduledAction::ArchiveOplog {
290 owned_worker_id, ..
291 } => owned_worker_id.clone(),
292 ScheduledAction::Invoke {
293 owned_worker_id, ..
294 } => owned_worker_id.clone(),
295 }
296 }
297}
298
299impl Display for ScheduledAction {
300 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
301 match self {
302 ScheduledAction::CompletePromise { promise_id, .. } => {
303 write!(f, "complete[{}]", promise_id)
304 }
305 ScheduledAction::ArchiveOplog {
306 owned_worker_id, ..
307 } => {
308 write!(f, "archive[{}]", owned_worker_id)
309 }
310 ScheduledAction::Invoke {
311 owned_worker_id, ..
312 } => write!(f, "invoke[{}]", owned_worker_id),
313 }
314 }
315}
316
317#[derive(Debug, Clone, Encode, Decode)]
318pub struct ScheduleId {
319 pub timestamp: i64,
320 pub action: ScheduledAction,
321}
322
323impl Display for ScheduleId {
324 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
325 write!(f, "{}@{}", self.action, self.timestamp)
326 }
327}
328
329#[derive(Clone)]
330pub struct NumberOfShards {
331 pub value: usize,
332}
333
334#[derive(Clone, Debug, PartialEq, Eq, Hash)]
335pub struct Pod {
336 host: String,
337 port: u16,
338}
339
340impl Pod {
341 pub fn uri(&self) -> Uri {
342 Uri::builder()
343 .scheme("http")
344 .authority(format!("{}:{}", self.host, self.port).as_str())
345 .path_and_query("/")
346 .build()
347 .expect("Failed to build URI")
348 }
349}
350
351#[derive(Clone)]
352pub struct RoutingTable {
353 pub number_of_shards: NumberOfShards,
354 shard_assignments: HashMap<ShardId, Pod>,
355}
356
357impl RoutingTable {
358 pub fn lookup(&self, worker_id: &WorkerId) -> Option<&Pod> {
359 self.shard_assignments.get(&ShardId::from_worker_id(
360 &worker_id.clone(),
361 self.number_of_shards.value,
362 ))
363 }
364
365 pub fn random(&self) -> Option<&Pod> {
366 self.shard_assignments.values().choose(&mut rand::rng())
367 }
368
369 pub fn first(&self) -> Option<&Pod> {
370 self.shard_assignments.values().next()
371 }
372
373 pub fn all(&self) -> HashSet<&Pod> {
374 self.shard_assignments.values().collect()
375 }
376}
377
378#[allow(dead_code)]
379pub struct RoutingTableEntry {
380 shard_id: ShardId,
381 pod: Pod,
382}
383
384#[derive(Clone, Debug, Default)]
385pub struct ShardAssignment {
386 pub number_of_shards: usize,
387 pub shard_ids: HashSet<ShardId>,
388}
389
390impl ShardAssignment {
391 pub fn new(number_of_shards: usize, shard_ids: HashSet<ShardId>) -> Self {
392 Self {
393 number_of_shards,
394 shard_ids,
395 }
396 }
397
398 pub fn assign_shards(&mut self, shard_ids: &HashSet<ShardId>) {
399 for shard_id in shard_ids {
400 self.shard_ids.insert(*shard_id);
401 }
402 }
403
404 pub fn register(&mut self, number_of_shards: usize, shard_ids: &HashSet<ShardId>) {
405 self.number_of_shards = number_of_shards;
406 for shard_id in shard_ids {
407 self.shard_ids.insert(*shard_id);
408 }
409 }
410
411 pub fn revoke_shards(&mut self, shard_ids: &HashSet<ShardId>) {
412 for shard_id in shard_ids {
413 self.shard_ids.remove(shard_id);
414 }
415 }
416}
417
418impl Display for ShardAssignment {
419 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
420 let shard_ids = self
421 .shard_ids
422 .iter()
423 .map(|shard_id| shard_id.to_string())
424 .collect::<Vec<_>>()
425 .join(",");
426 write!(
427 f,
428 "{{ number_of_shards: {}, shard_ids: {} }}",
429 self.number_of_shards, shard_ids
430 )
431 }
432}
433
434#[derive(Clone, Debug, Encode, Decode, Eq, Hash, PartialEq, IntoValue)]
435#[wit_transparent]
436pub struct IdempotencyKey {
437 pub value: String,
438}
439
440impl IdempotencyKey {
441 const ROOT_NS: Uuid = uuid!("9C19B15A-C83D-46F7-9BC3-EAD7923733F4");
442
443 pub fn new(value: String) -> Self {
444 Self { value }
445 }
446
447 pub fn from_uuid(value: Uuid) -> Self {
448 Self {
449 value: value.to_string(),
450 }
451 }
452
453 pub fn fresh() -> Self {
454 Self::from_uuid(Uuid::new_v4())
455 }
456
457 pub fn derived(base: &IdempotencyKey, oplog_index: OplogIndex) -> Self {
467 let namespace = if let Ok(base_uuid) = Uuid::parse_str(&base.value) {
468 base_uuid
469 } else {
470 Uuid::new_v5(&Self::ROOT_NS, base.value.as_bytes())
471 };
472 let name = format!("oplog-index-{}", oplog_index);
473 Self::from_uuid(Uuid::new_v5(&namespace, name.as_bytes()))
474 }
475}
476
477impl Serialize for IdempotencyKey {
478 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
479 where
480 S: Serializer,
481 {
482 self.value.serialize(serializer)
483 }
484}
485
486impl<'de> Deserialize<'de> for IdempotencyKey {
487 fn deserialize<D>(deserializer: D) -> Result<IdempotencyKey, D::Error>
488 where
489 D: Deserializer<'de>,
490 {
491 let value = String::deserialize(deserializer)?;
492 Ok(IdempotencyKey { value })
493 }
494}
495
496impl Display for IdempotencyKey {
497 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
498 write!(f, "{}", self.value)
499 }
500}
501
502#[derive(Clone, Debug)]
503pub struct WorkerMetadata {
504 pub worker_id: WorkerId,
505 pub args: Vec<String>,
506 pub env: Vec<(String, String)>,
507 pub account_id: AccountId,
508 pub created_at: Timestamp,
509 pub parent: Option<WorkerId>,
510 pub last_known_status: WorkerStatusRecord,
511}
512
513impl WorkerMetadata {
514 pub fn default(worker_id: WorkerId, account_id: AccountId) -> WorkerMetadata {
515 WorkerMetadata {
516 worker_id,
517 args: vec![],
518 env: vec![],
519 account_id,
520 created_at: Timestamp::now_utc(),
521 parent: None,
522 last_known_status: WorkerStatusRecord::default(),
523 }
524 }
525
526 pub fn owned_worker_id(&self) -> OwnedWorkerId {
527 OwnedWorkerId::new(&self.account_id, &self.worker_id)
528 }
529}
530
531impl IntoValue for WorkerMetadata {
532 fn into_value(self) -> Value {
533 Value::Record(vec![
534 self.worker_id.into_value(),
535 self.args.into_value(),
536 self.env.into_value(),
537 self.last_known_status.status.into_value(),
538 self.last_known_status.component_version.into_value(),
539 0u64.into_value(), ])
541 }
542
543 fn get_type() -> AnalysedType {
544 record(vec![
545 field("worker-id", WorkerId::get_type()),
546 field("args", list(str())),
547 field("env", list(tuple(vec![str(), str()]))),
548 field("status", WorkerStatus::get_type()),
549 field("component-version", u64()),
550 field("retry-count", u64()),
551 ])
552 }
553}
554
555#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
556pub struct WorkerResourceDescription {
557 pub created_at: Timestamp,
558 pub indexed_resource_key: Option<IndexedResourceKey>,
559}
560
561#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)]
562pub struct RetryConfig {
563 pub max_attempts: u32,
564 #[serde(with = "humantime_serde")]
565 pub min_delay: Duration,
566 #[serde(with = "humantime_serde")]
567 pub max_delay: Duration,
568 pub multiplier: f64,
569 pub max_jitter_factor: Option<f64>,
570}
571
572#[derive(Clone, Debug, PartialEq, Encode)]
578pub struct WorkerStatusRecord {
579 pub status: WorkerStatus,
580 pub skipped_regions: DeletedRegions,
581 pub overridden_retry_config: Option<RetryConfig>,
582 pub pending_invocations: Vec<TimestampedWorkerInvocation>,
583 pub pending_updates: VecDeque<TimestampedUpdateDescription>,
584 pub failed_updates: Vec<FailedUpdateRecord>,
585 pub successful_updates: Vec<SuccessfulUpdateRecord>,
586 pub invocation_results: HashMap<IdempotencyKey, OplogIndex>,
587 pub current_idempotency_key: Option<IdempotencyKey>,
588 pub component_version: ComponentVersion,
589 pub component_size: u64,
590 pub total_linear_memory_size: u64,
591 pub owned_resources: HashMap<WorkerResourceId, WorkerResourceDescription>,
592 pub oplog_idx: OplogIndex,
593 pub active_plugins: HashSet<PluginInstallationId>,
594 pub deleted_regions: DeletedRegions,
595 pub component_version_for_replay: ComponentVersion,
598}
599
600impl<Context> bincode::Decode<Context> for WorkerStatusRecord {
601 fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
602 Ok(Self {
603 status: Decode::decode(decoder)?,
604 skipped_regions: Decode::decode(decoder)?,
605 overridden_retry_config: Decode::decode(decoder)?,
606 pending_invocations: Decode::decode(decoder)?,
607 pending_updates: Decode::decode(decoder)?,
608 failed_updates: Decode::decode(decoder)?,
609 successful_updates: Decode::decode(decoder)?,
610 invocation_results: Decode::decode(decoder)?,
611 current_idempotency_key: Decode::decode(decoder)?,
612 component_version: Decode::decode(decoder)?,
613 component_size: Decode::decode(decoder)?,
614 total_linear_memory_size: Decode::decode(decoder)?,
615 owned_resources: Decode::decode(decoder)?,
616 oplog_idx: Decode::decode(decoder)?,
617 active_plugins: Decode::decode(decoder)?,
618 deleted_regions: Decode::decode(decoder)?,
619 component_version_for_replay: Decode::decode(decoder)?,
620 })
621 }
622}
623impl<'de, Context> BorrowDecode<'de, Context> for WorkerStatusRecord {
624 fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
625 decoder: &mut D,
626 ) -> Result<Self, DecodeError> {
627 Ok(Self {
628 status: BorrowDecode::borrow_decode(decoder)?,
629 skipped_regions: BorrowDecode::borrow_decode(decoder)?,
630 overridden_retry_config: BorrowDecode::borrow_decode(decoder)?,
631 pending_invocations: BorrowDecode::borrow_decode(decoder)?,
632 pending_updates: BorrowDecode::borrow_decode(decoder)?,
633 failed_updates: BorrowDecode::borrow_decode(decoder)?,
634 successful_updates: BorrowDecode::borrow_decode(decoder)?,
635 invocation_results: BorrowDecode::borrow_decode(decoder)?,
636 current_idempotency_key: BorrowDecode::borrow_decode(decoder)?,
637 component_version: BorrowDecode::borrow_decode(decoder)?,
638 component_size: BorrowDecode::borrow_decode(decoder)?,
639 total_linear_memory_size: BorrowDecode::borrow_decode(decoder)?,
640 owned_resources: BorrowDecode::borrow_decode(decoder)?,
641 oplog_idx: BorrowDecode::borrow_decode(decoder)?,
642 active_plugins: BorrowDecode::borrow_decode(decoder)?,
643 deleted_regions: BorrowDecode::borrow_decode(decoder)?,
644 component_version_for_replay: BorrowDecode::borrow_decode(decoder)?,
645 })
646 }
647}
648
649impl Default for WorkerStatusRecord {
650 fn default() -> Self {
651 WorkerStatusRecord {
652 status: WorkerStatus::Idle,
653 skipped_regions: DeletedRegions::new(),
654 overridden_retry_config: None,
655 pending_invocations: Vec::new(),
656 pending_updates: VecDeque::new(),
657 failed_updates: Vec::new(),
658 successful_updates: Vec::new(),
659 invocation_results: HashMap::new(),
660 current_idempotency_key: None,
661 component_version: 0,
662 component_size: 0,
663 total_linear_memory_size: 0,
664 owned_resources: HashMap::new(),
665 oplog_idx: OplogIndex::default(),
666 active_plugins: HashSet::new(),
667 deleted_regions: DeletedRegions::new(),
668 component_version_for_replay: 0,
669 }
670 }
671}
672
673#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
674pub struct FailedUpdateRecord {
675 pub timestamp: Timestamp,
676 pub target_version: ComponentVersion,
677 pub details: Option<String>,
678}
679
680#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
681pub struct SuccessfulUpdateRecord {
682 pub timestamp: Timestamp,
683 pub target_version: ComponentVersion,
684}
685
686#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode, IntoValue)]
691#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
692pub enum WorkerStatus {
693 Running,
695 Idle,
697 Suspended,
699 Interrupted,
701 Retrying,
703 Failed,
705 Exited,
707}
708
709impl PartialOrd for WorkerStatus {
710 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
711 Some(self.cmp(other))
712 }
713}
714
715impl Ord for WorkerStatus {
716 fn cmp(&self, other: &Self) -> Ordering {
717 let v1: i32 = self.clone().into();
718 let v2: i32 = other.clone().into();
719 v1.cmp(&v2)
720 }
721}
722
723impl FromStr for WorkerStatus {
724 type Err = String;
725
726 fn from_str(s: &str) -> Result<Self, Self::Err> {
727 match s.to_lowercase().as_str() {
728 "running" => Ok(WorkerStatus::Running),
729 "idle" => Ok(WorkerStatus::Idle),
730 "suspended" => Ok(WorkerStatus::Suspended),
731 "interrupted" => Ok(WorkerStatus::Interrupted),
732 "retrying" => Ok(WorkerStatus::Retrying),
733 "failed" => Ok(WorkerStatus::Failed),
734 "exited" => Ok(WorkerStatus::Exited),
735 _ => Err(format!("Unknown worker status: {}", s)),
736 }
737 }
738}
739
740impl Display for WorkerStatus {
741 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
742 match self {
743 WorkerStatus::Running => write!(f, "Running"),
744 WorkerStatus::Idle => write!(f, "Idle"),
745 WorkerStatus::Suspended => write!(f, "Suspended"),
746 WorkerStatus::Interrupted => write!(f, "Interrupted"),
747 WorkerStatus::Retrying => write!(f, "Retrying"),
748 WorkerStatus::Failed => write!(f, "Failed"),
749 WorkerStatus::Exited => write!(f, "Exited"),
750 }
751 }
752}
753
754impl TryFrom<i32> for WorkerStatus {
755 type Error = String;
756
757 fn try_from(value: i32) -> Result<Self, Self::Error> {
758 match value {
759 0 => Ok(WorkerStatus::Running),
760 1 => Ok(WorkerStatus::Idle),
761 2 => Ok(WorkerStatus::Suspended),
762 3 => Ok(WorkerStatus::Interrupted),
763 4 => Ok(WorkerStatus::Retrying),
764 5 => Ok(WorkerStatus::Failed),
765 6 => Ok(WorkerStatus::Exited),
766 _ => Err(format!("Unknown worker status: {}", value)),
767 }
768 }
769}
770
771impl From<WorkerStatus> for i32 {
772 fn from(value: WorkerStatus) -> Self {
773 match value {
774 WorkerStatus::Running => 0,
775 WorkerStatus::Idle => 1,
776 WorkerStatus::Suspended => 2,
777 WorkerStatus::Interrupted => 3,
778 WorkerStatus::Retrying => 4,
779 WorkerStatus::Failed => 5,
780 WorkerStatus::Exited => 6,
781 }
782 }
783}
784
785#[derive(Clone, Debug, PartialEq, Encode, Decode)]
788enum SerializedWorkerInvocation {
789 ExportedFunctionV1 {
790 idempotency_key: IdempotencyKey,
791 full_function_name: String,
792 function_input: Vec<Value>,
793 },
794 ManualUpdate {
795 target_version: ComponentVersion,
796 },
797 ExportedFunction {
798 idempotency_key: IdempotencyKey,
799 full_function_name: String,
800 function_input: Vec<Value>,
801 invocation_context: InvocationContextStack,
802 },
803}
804
805impl From<WorkerInvocation> for SerializedWorkerInvocation {
806 fn from(value: WorkerInvocation) -> Self {
807 match value {
808 WorkerInvocation::ManualUpdate { target_version } => {
809 Self::ManualUpdate { target_version }
810 }
811 WorkerInvocation::ExportedFunction {
812 idempotency_key,
813 full_function_name,
814 function_input,
815 invocation_context,
816 } => Self::ExportedFunction {
817 idempotency_key,
818 full_function_name,
819 function_input,
820 invocation_context,
821 },
822 }
823 }
824}
825
826impl From<SerializedWorkerInvocation> for WorkerInvocation {
827 fn from(value: SerializedWorkerInvocation) -> Self {
828 match value {
829 SerializedWorkerInvocation::ExportedFunctionV1 {
830 idempotency_key,
831 full_function_name,
832 function_input,
833 } => Self::ExportedFunction {
834 idempotency_key,
835 full_function_name,
836 function_input,
837 invocation_context: InvocationContextStack::fresh(),
838 },
839 SerializedWorkerInvocation::ManualUpdate { target_version } => {
840 Self::ManualUpdate { target_version }
841 }
842 SerializedWorkerInvocation::ExportedFunction {
843 idempotency_key,
844 full_function_name,
845 function_input,
846 invocation_context,
847 } => Self::ExportedFunction {
848 idempotency_key,
849 full_function_name,
850 function_input,
851 invocation_context,
852 },
853 }
854 }
855}
856
857#[derive(Clone, Debug, PartialEq)]
858pub enum WorkerInvocation {
859 ManualUpdate {
860 target_version: ComponentVersion,
861 },
862 ExportedFunction {
863 idempotency_key: IdempotencyKey,
864 full_function_name: String,
865 function_input: Vec<Value>,
866 invocation_context: InvocationContextStack,
867 },
868}
869
870impl WorkerInvocation {
871 pub fn is_idempotency_key(&self, key: &IdempotencyKey) -> bool {
872 match self {
873 Self::ExportedFunction {
874 idempotency_key, ..
875 } => idempotency_key == key,
876 _ => false,
877 }
878 }
879
880 pub fn idempotency_key(&self) -> Option<&IdempotencyKey> {
881 match self {
882 Self::ExportedFunction {
883 idempotency_key, ..
884 } => Some(idempotency_key),
885 _ => None,
886 }
887 }
888
889 pub fn invocation_context(&self) -> InvocationContextStack {
890 match self {
891 Self::ExportedFunction {
892 invocation_context, ..
893 } => invocation_context.clone(),
894 _ => InvocationContextStack::fresh(),
895 }
896 }
897}
898
899impl Encode for WorkerInvocation {
900 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
901 let serialized: SerializedWorkerInvocation = self.clone().into();
902 serialized.encode(encoder)
903 }
904}
905
906impl<Context> Decode<Context> for WorkerInvocation {
907 fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
908 let serialized: SerializedWorkerInvocation = Decode::decode(decoder)?;
909 Ok(serialized.into())
910 }
911}
912
913impl<'de, Context> BorrowDecode<'de, Context> for WorkerInvocation {
914 fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
915 decoder: &mut D,
916 ) -> Result<Self, DecodeError> {
917 let serialized: SerializedWorkerInvocation = BorrowDecode::borrow_decode(decoder)?;
918 Ok(serialized.into())
919 }
920}
921
922#[derive(Clone, Debug, PartialEq, Encode, Decode)]
923pub struct TimestampedWorkerInvocation {
924 pub timestamp: Timestamp,
925 pub invocation: WorkerInvocation,
926}
927
928#[derive(
929 Clone,
930 Debug,
931 PartialOrd,
932 Ord,
933 derive_more::FromStr,
934 Eq,
935 Hash,
936 PartialEq,
937 Serialize,
938 Deserialize,
939 Encode,
940 Decode,
941 IntoValue,
942)]
943#[serde(transparent)]
944pub struct AccountId {
945 pub value: String,
946}
947
948impl AccountId {
949 pub fn placeholder() -> Self {
950 Self {
951 value: "-1".to_string(),
952 }
953 }
954
955 pub fn generate() -> Self {
956 Self {
957 value: Uuid::new_v4().to_string(),
958 }
959 }
960}
961
962impl From<&str> for AccountId {
963 fn from(value: &str) -> Self {
964 Self {
965 value: value.to_string(),
966 }
967 }
968}
969
970impl Display for AccountId {
971 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
972 write!(f, "{}", &self.value)
973 }
974}
975
976#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
977#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
978#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
979#[serde(rename_all = "camelCase")]
980pub struct WorkerNameFilter {
981 pub comparator: StringFilterComparator,
982 pub value: String,
983}
984
985impl WorkerNameFilter {
986 pub fn new(comparator: StringFilterComparator, value: String) -> Self {
987 Self { comparator, value }
988 }
989}
990
991impl Display for WorkerNameFilter {
992 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
993 write!(f, "name {} {}", self.comparator, self.value)
994 }
995}
996
997#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
998#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
999#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1000#[serde(rename_all = "camelCase")]
1001pub struct WorkerStatusFilter {
1002 pub comparator: FilterComparator,
1003 pub value: WorkerStatus,
1004}
1005
1006impl WorkerStatusFilter {
1007 pub fn new(comparator: FilterComparator, value: WorkerStatus) -> Self {
1008 Self { comparator, value }
1009 }
1010}
1011
1012impl Display for WorkerStatusFilter {
1013 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1014 write!(f, "status == {:?}", self.value)
1015 }
1016}
1017
1018#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1019#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1020#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1021#[serde(rename_all = "camelCase")]
1022pub struct WorkerVersionFilter {
1023 pub comparator: FilterComparator,
1024 pub value: ComponentVersion,
1025}
1026
1027impl WorkerVersionFilter {
1028 pub fn new(comparator: FilterComparator, value: ComponentVersion) -> Self {
1029 Self { comparator, value }
1030 }
1031}
1032
1033impl Display for WorkerVersionFilter {
1034 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1035 write!(f, "version {} {}", self.comparator, self.value)
1036 }
1037}
1038
1039#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1040#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1041#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1042#[serde(rename_all = "camelCase")]
1043pub struct WorkerCreatedAtFilter {
1044 pub comparator: FilterComparator,
1045 pub value: Timestamp,
1046}
1047
1048impl WorkerCreatedAtFilter {
1049 pub fn new(comparator: FilterComparator, value: Timestamp) -> Self {
1050 Self { comparator, value }
1051 }
1052}
1053
1054impl Display for WorkerCreatedAtFilter {
1055 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1056 write!(f, "created_at {} {}", self.comparator, self.value)
1057 }
1058}
1059
1060#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1061#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1062#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1063#[serde(rename_all = "camelCase")]
1064pub struct WorkerEnvFilter {
1065 pub name: String,
1066 pub comparator: StringFilterComparator,
1067 pub value: String,
1068}
1069
1070impl WorkerEnvFilter {
1071 pub fn new(name: String, comparator: StringFilterComparator, value: String) -> Self {
1072 Self {
1073 name,
1074 comparator,
1075 value,
1076 }
1077 }
1078}
1079
1080impl Display for WorkerEnvFilter {
1081 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1082 write!(f, "env.{} {} {}", self.name, self.comparator, self.value)
1083 }
1084}
1085
1086#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1087#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1088#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1089#[serde(rename_all = "camelCase")]
1090pub struct WorkerAndFilter {
1091 pub filters: Vec<WorkerFilter>,
1092}
1093
1094impl WorkerAndFilter {
1095 pub fn new(filters: Vec<WorkerFilter>) -> Self {
1096 Self { filters }
1097 }
1098}
1099
1100impl Display for WorkerAndFilter {
1101 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1102 write!(
1103 f,
1104 "({})",
1105 self.filters
1106 .iter()
1107 .map(|f| f.clone().to_string())
1108 .collect::<Vec<String>>()
1109 .join(" AND ")
1110 )
1111 }
1112}
1113
1114#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1115#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1116#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1117#[serde(rename_all = "camelCase")]
1118pub struct WorkerOrFilter {
1119 pub filters: Vec<WorkerFilter>,
1120}
1121
1122impl WorkerOrFilter {
1123 pub fn new(filters: Vec<WorkerFilter>) -> Self {
1124 Self { filters }
1125 }
1126}
1127
1128impl Display for WorkerOrFilter {
1129 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1130 write!(
1131 f,
1132 "({})",
1133 self.filters
1134 .iter()
1135 .map(|f| f.clone().to_string())
1136 .collect::<Vec<String>>()
1137 .join(" OR ")
1138 )
1139 }
1140}
1141
1142#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1143#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1144#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1145#[serde(rename_all = "camelCase")]
1146pub struct WorkerNotFilter {
1147 filter: Box<WorkerFilter>,
1148}
1149
1150impl WorkerNotFilter {
1151 pub fn new(filter: WorkerFilter) -> Self {
1152 Self {
1153 filter: Box::new(filter),
1154 }
1155 }
1156}
1157
1158impl Display for WorkerNotFilter {
1159 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1160 write!(f, "NOT ({})", self.filter)
1161 }
1162}
1163
1164#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1165#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
1166#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
1167#[serde(tag = "type")]
1168pub enum WorkerFilter {
1169 Name(WorkerNameFilter),
1170 Status(WorkerStatusFilter),
1171 Version(WorkerVersionFilter),
1172 CreatedAt(WorkerCreatedAtFilter),
1173 Env(WorkerEnvFilter),
1174 And(WorkerAndFilter),
1175 Or(WorkerOrFilter),
1176 Not(WorkerNotFilter),
1177}
1178
1179impl WorkerFilter {
1180 pub fn and(&self, filter: WorkerFilter) -> Self {
1181 match self.clone() {
1182 WorkerFilter::And(WorkerAndFilter { filters }) => {
1183 Self::new_and([filters, vec![filter]].concat())
1184 }
1185 f => Self::new_and(vec![f, filter]),
1186 }
1187 }
1188
1189 pub fn or(&self, filter: WorkerFilter) -> Self {
1190 match self.clone() {
1191 WorkerFilter::Or(WorkerOrFilter { filters }) => {
1192 Self::new_or([filters, vec![filter]].concat())
1193 }
1194 f => Self::new_or(vec![f, filter]),
1195 }
1196 }
1197
1198 pub fn not(&self) -> Self {
1199 Self::new_not(self.clone())
1200 }
1201
1202 pub fn matches(&self, metadata: &WorkerMetadata) -> bool {
1203 match self.clone() {
1204 WorkerFilter::Name(WorkerNameFilter { comparator, value }) => {
1205 comparator.matches(&metadata.worker_id.worker_name, &value)
1206 }
1207 WorkerFilter::Version(WorkerVersionFilter { comparator, value }) => {
1208 let version: ComponentVersion = metadata.last_known_status.component_version;
1209 comparator.matches(&version, &value)
1210 }
1211 WorkerFilter::Env(WorkerEnvFilter {
1212 name,
1213 comparator,
1214 value,
1215 }) => {
1216 let mut result = false;
1217 let name = name.to_lowercase();
1218 for env_value in metadata.env.clone() {
1219 if env_value.0.to_lowercase() == name {
1220 result = comparator.matches(&env_value.1, &value);
1221
1222 break;
1223 }
1224 }
1225 result
1226 }
1227 WorkerFilter::CreatedAt(WorkerCreatedAtFilter { comparator, value }) => {
1228 comparator.matches(&metadata.created_at, &value)
1229 }
1230 WorkerFilter::Status(WorkerStatusFilter { comparator, value }) => {
1231 comparator.matches(&metadata.last_known_status.status, &value)
1232 }
1233 WorkerFilter::Not(WorkerNotFilter { filter }) => !filter.matches(metadata),
1234 WorkerFilter::And(WorkerAndFilter { filters }) => {
1235 let mut result = true;
1236 for filter in filters {
1237 if !filter.matches(metadata) {
1238 result = false;
1239 break;
1240 }
1241 }
1242 result
1243 }
1244 WorkerFilter::Or(WorkerOrFilter { filters }) => {
1245 let mut result = true;
1246 if !filters.is_empty() {
1247 result = false;
1248 for filter in filters {
1249 if filter.matches(metadata) {
1250 result = true;
1251 break;
1252 }
1253 }
1254 }
1255 result
1256 }
1257 }
1258 }
1259
1260 pub fn new_and(filters: Vec<WorkerFilter>) -> Self {
1261 WorkerFilter::And(WorkerAndFilter::new(filters))
1262 }
1263
1264 pub fn new_or(filters: Vec<WorkerFilter>) -> Self {
1265 WorkerFilter::Or(WorkerOrFilter::new(filters))
1266 }
1267
1268 pub fn new_not(filter: WorkerFilter) -> Self {
1269 WorkerFilter::Not(WorkerNotFilter::new(filter))
1270 }
1271
1272 pub fn new_name(comparator: StringFilterComparator, value: String) -> Self {
1273 WorkerFilter::Name(WorkerNameFilter::new(comparator, value))
1274 }
1275
1276 pub fn new_env(name: String, comparator: StringFilterComparator, value: String) -> Self {
1277 WorkerFilter::Env(WorkerEnvFilter::new(name, comparator, value))
1278 }
1279
1280 pub fn new_version(comparator: FilterComparator, value: ComponentVersion) -> Self {
1281 WorkerFilter::Version(WorkerVersionFilter::new(comparator, value))
1282 }
1283
1284 pub fn new_status(comparator: FilterComparator, value: WorkerStatus) -> Self {
1285 WorkerFilter::Status(WorkerStatusFilter::new(comparator, value))
1286 }
1287
1288 pub fn new_created_at(comparator: FilterComparator, value: Timestamp) -> Self {
1289 WorkerFilter::CreatedAt(WorkerCreatedAtFilter::new(comparator, value))
1290 }
1291
1292 pub fn from(filters: Vec<String>) -> Result<WorkerFilter, String> {
1293 let mut fs = Vec::new();
1294 for f in filters {
1295 fs.push(WorkerFilter::from_str(&f)?);
1296 }
1297 Ok(WorkerFilter::new_and(fs))
1298 }
1299}
1300
1301impl Display for WorkerFilter {
1302 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1303 match self {
1304 WorkerFilter::Name(filter) => {
1305 write!(f, "{}", filter)
1306 }
1307 WorkerFilter::Version(filter) => {
1308 write!(f, "{}", filter)
1309 }
1310 WorkerFilter::Status(filter) => {
1311 write!(f, "{}", filter)
1312 }
1313 WorkerFilter::CreatedAt(filter) => {
1314 write!(f, "{}", filter)
1315 }
1316 WorkerFilter::Env(filter) => {
1317 write!(f, "{}", filter)
1318 }
1319 WorkerFilter::Not(filter) => {
1320 write!(f, "{}", filter)
1321 }
1322 WorkerFilter::And(filter) => {
1323 write!(f, "{}", filter)
1324 }
1325 WorkerFilter::Or(filter) => {
1326 write!(f, "{}", filter)
1327 }
1328 }
1329 }
1330}
1331
1332impl FromStr for WorkerFilter {
1333 type Err = String;
1334
1335 fn from_str(s: &str) -> Result<Self, Self::Err> {
1336 let elements = s.split_whitespace().collect::<Vec<&str>>();
1337
1338 if elements.len() == 3 {
1339 let arg = elements[0];
1340 let comparator = elements[1];
1341 let value = elements[2];
1342 match arg {
1343 "name" => Ok(WorkerFilter::new_name(
1344 comparator.parse()?,
1345 value.to_string(),
1346 )),
1347 "version" => Ok(WorkerFilter::new_version(
1348 comparator.parse()?,
1349 value
1350 .parse()
1351 .map_err(|e| format!("Invalid filter value: {}", e))?,
1352 )),
1353 "status" => Ok(WorkerFilter::new_status(
1354 comparator.parse()?,
1355 value.parse()?,
1356 )),
1357 "created_at" | "createdAt" => Ok(WorkerFilter::new_created_at(
1358 comparator.parse()?,
1359 value.parse()?,
1360 )),
1361 _ if arg.starts_with("env.") => {
1362 let name = &arg[4..];
1363 Ok(WorkerFilter::new_env(
1364 name.to_string(),
1365 comparator.parse()?,
1366 value.to_string(),
1367 ))
1368 }
1369 _ => Err(format!("Invalid filter: {}", s)),
1370 }
1371 } else {
1372 Err(format!("Invalid filter: {}", s))
1373 }
1374 }
1375}
1376
1377#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1378#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1379pub enum StringFilterComparator {
1380 Equal,
1381 NotEqual,
1382 Like,
1383 NotLike,
1384}
1385
1386impl StringFilterComparator {
1387 pub fn matches<T: Display>(&self, value1: &T, value2: &T) -> bool {
1388 match self {
1389 StringFilterComparator::Equal => value1.to_string() == value2.to_string(),
1390 StringFilterComparator::NotEqual => value1.to_string() != value2.to_string(),
1391 StringFilterComparator::Like => {
1392 value1.to_string().contains(value2.to_string().as_str())
1393 }
1394 StringFilterComparator::NotLike => {
1395 !value1.to_string().contains(value2.to_string().as_str())
1396 }
1397 }
1398 }
1399}
1400
1401impl FromStr for StringFilterComparator {
1402 type Err = String;
1403
1404 fn from_str(s: &str) -> Result<Self, Self::Err> {
1405 match s.to_lowercase().as_str() {
1406 "==" | "=" | "equal" | "eq" => Ok(StringFilterComparator::Equal),
1407 "!=" | "notequal" | "ne" => Ok(StringFilterComparator::NotEqual),
1408 "like" => Ok(StringFilterComparator::Like),
1409 "notlike" => Ok(StringFilterComparator::NotLike),
1410 _ => Err(format!("Unknown String Filter Comparator: {}", s)),
1411 }
1412 }
1413}
1414
1415impl TryFrom<i32> for StringFilterComparator {
1416 type Error = String;
1417
1418 fn try_from(value: i32) -> Result<Self, Self::Error> {
1419 match value {
1420 0 => Ok(StringFilterComparator::Equal),
1421 1 => Ok(StringFilterComparator::NotEqual),
1422 2 => Ok(StringFilterComparator::Like),
1423 3 => Ok(StringFilterComparator::NotLike),
1424 _ => Err(format!("Unknown String Filter Comparator: {}", value)),
1425 }
1426 }
1427}
1428
1429impl From<StringFilterComparator> for i32 {
1430 fn from(value: StringFilterComparator) -> Self {
1431 match value {
1432 StringFilterComparator::Equal => 0,
1433 StringFilterComparator::NotEqual => 1,
1434 StringFilterComparator::Like => 2,
1435 StringFilterComparator::NotLike => 3,
1436 }
1437 }
1438}
1439
1440impl Display for StringFilterComparator {
1441 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1442 let s = match self {
1443 StringFilterComparator::Equal => "==",
1444 StringFilterComparator::NotEqual => "!=",
1445 StringFilterComparator::Like => "like",
1446 StringFilterComparator::NotLike => "notlike",
1447 };
1448 write!(f, "{}", s)
1449 }
1450}
1451
1452#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1453#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1454pub enum FilterComparator {
1455 Equal,
1456 NotEqual,
1457 GreaterEqual,
1458 Greater,
1459 LessEqual,
1460 Less,
1461}
1462
1463impl Display for FilterComparator {
1464 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1465 let s = match self {
1466 FilterComparator::Equal => "==",
1467 FilterComparator::NotEqual => "!=",
1468 FilterComparator::GreaterEqual => ">=",
1469 FilterComparator::Greater => ">",
1470 FilterComparator::LessEqual => "<=",
1471 FilterComparator::Less => "<",
1472 };
1473 write!(f, "{}", s)
1474 }
1475}
1476
1477impl FilterComparator {
1478 pub fn matches<T: Ord>(&self, value1: &T, value2: &T) -> bool {
1479 match self {
1480 FilterComparator::Equal => value1 == value2,
1481 FilterComparator::NotEqual => value1 != value2,
1482 FilterComparator::Less => value1 < value2,
1483 FilterComparator::LessEqual => value1 <= value2,
1484 FilterComparator::Greater => value1 > value2,
1485 FilterComparator::GreaterEqual => value1 >= value2,
1486 }
1487 }
1488}
1489
1490impl FromStr for FilterComparator {
1491 type Err = String;
1492 fn from_str(s: &str) -> Result<Self, Self::Err> {
1493 match s.to_lowercase().as_str() {
1494 "==" | "=" | "equal" | "eq" => Ok(FilterComparator::Equal),
1495 "!=" | "notequal" | "ne" => Ok(FilterComparator::NotEqual),
1496 ">=" | "greaterequal" | "ge" => Ok(FilterComparator::GreaterEqual),
1497 ">" | "greater" | "gt" => Ok(FilterComparator::Greater),
1498 "<=" | "lessequal" | "le" => Ok(FilterComparator::LessEqual),
1499 "<" | "less" | "lt" => Ok(FilterComparator::Less),
1500 _ => Err(format!("Unknown Filter Comparator: {}", s)),
1501 }
1502 }
1503}
1504
1505impl TryFrom<i32> for FilterComparator {
1506 type Error = String;
1507
1508 fn try_from(value: i32) -> Result<Self, Self::Error> {
1509 match value {
1510 0 => Ok(FilterComparator::Equal),
1511 1 => Ok(FilterComparator::NotEqual),
1512 2 => Ok(FilterComparator::Less),
1513 3 => Ok(FilterComparator::LessEqual),
1514 4 => Ok(FilterComparator::Greater),
1515 5 => Ok(FilterComparator::GreaterEqual),
1516 _ => Err(format!("Unknown Filter Comparator: {}", value)),
1517 }
1518 }
1519}
1520
1521impl From<FilterComparator> for i32 {
1522 fn from(value: FilterComparator) -> Self {
1523 match value {
1524 FilterComparator::Equal => 0,
1525 FilterComparator::NotEqual => 1,
1526 FilterComparator::Less => 2,
1527 FilterComparator::LessEqual => 3,
1528 FilterComparator::Greater => 4,
1529 FilterComparator::GreaterEqual => 5,
1530 }
1531 }
1532}
1533
1534#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, Default)]
1535#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1536#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1537#[serde(rename_all = "camelCase")]
1538pub struct ScanCursor {
1539 pub cursor: u64,
1540 pub layer: usize,
1541}
1542
1543impl ScanCursor {
1544 pub fn is_active_layer_finished(&self) -> bool {
1545 self.cursor == 0
1546 }
1547
1548 pub fn is_finished(&self) -> bool {
1549 self.cursor == 0 && self.layer == 0
1550 }
1551
1552 pub fn into_option(self) -> Option<Self> {
1553 if self.is_finished() {
1554 None
1555 } else {
1556 Some(self)
1557 }
1558 }
1559}
1560
1561impl Display for ScanCursor {
1562 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1563 write!(f, "{}/{}", self.layer, self.cursor)
1564 }
1565}
1566
1567impl FromStr for ScanCursor {
1568 type Err = String;
1569
1570 fn from_str(s: &str) -> Result<Self, Self::Err> {
1571 let parts = s.split('/').collect::<Vec<&str>>();
1572 if parts.len() == 2 {
1573 Ok(ScanCursor {
1574 layer: parts[0]
1575 .parse()
1576 .map_err(|e| format!("Invalid layer part: {}", e))?,
1577 cursor: parts[1]
1578 .parse()
1579 .map_err(|e| format!("Invalid cursor part: {}", e))?,
1580 })
1581 } else {
1582 Err("Invalid cursor, must have 'layer/cursor' format".to_string())
1583 }
1584 }
1585}
1586
1587#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
1588pub enum LogLevel {
1589 Trace,
1590 Debug,
1591 Info,
1592 Warn,
1593 Error,
1594 Critical,
1595}
1596
1597#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1598pub enum WorkerEvent {
1599 StdOut {
1600 timestamp: Timestamp,
1601 bytes: Vec<u8>,
1602 },
1603 StdErr {
1604 timestamp: Timestamp,
1605 bytes: Vec<u8>,
1606 },
1607 Log {
1608 timestamp: Timestamp,
1609 level: LogLevel,
1610 context: String,
1611 message: String,
1612 },
1613 InvocationStart {
1614 timestamp: Timestamp,
1615 function: String,
1616 idempotency_key: IdempotencyKey,
1617 },
1618 InvocationFinished {
1619 timestamp: Timestamp,
1620 function: String,
1621 idempotency_key: IdempotencyKey,
1622 },
1623 Close,
1624}
1625
1626impl WorkerEvent {
1627 pub fn stdout(bytes: Vec<u8>) -> WorkerEvent {
1628 WorkerEvent::StdOut {
1629 timestamp: Timestamp::now_utc(),
1630 bytes,
1631 }
1632 }
1633
1634 pub fn stderr(bytes: Vec<u8>) -> WorkerEvent {
1635 WorkerEvent::StdErr {
1636 timestamp: Timestamp::now_utc(),
1637 bytes,
1638 }
1639 }
1640
1641 pub fn log(level: LogLevel, context: &str, message: &str) -> WorkerEvent {
1642 WorkerEvent::Log {
1643 timestamp: Timestamp::now_utc(),
1644 level,
1645 context: context.to_string(),
1646 message: message.to_string(),
1647 }
1648 }
1649
1650 pub fn invocation_start(function: &str, idempotency_key: &IdempotencyKey) -> WorkerEvent {
1651 WorkerEvent::InvocationStart {
1652 timestamp: Timestamp::now_utc(),
1653 function: function.to_string(),
1654 idempotency_key: idempotency_key.clone(),
1655 }
1656 }
1657
1658 pub fn invocation_finished(function: &str, idempotency_key: &IdempotencyKey) -> WorkerEvent {
1659 WorkerEvent::InvocationFinished {
1660 timestamp: Timestamp::now_utc(),
1661 function: function.to_string(),
1662 idempotency_key: idempotency_key.clone(),
1663 }
1664 }
1665
1666 pub fn as_oplog_entry(&self) -> Option<OplogEntry> {
1667 match self {
1668 WorkerEvent::StdOut { timestamp, bytes } => Some(OplogEntry::Log {
1669 timestamp: *timestamp,
1670 level: oplog::LogLevel::Stdout,
1671 context: String::new(),
1672 message: String::from_utf8_lossy(bytes).to_string(),
1673 }),
1674 WorkerEvent::StdErr { timestamp, bytes } => Some(OplogEntry::Log {
1675 timestamp: *timestamp,
1676 level: oplog::LogLevel::Stderr,
1677 context: String::new(),
1678 message: String::from_utf8_lossy(bytes).to_string(),
1679 }),
1680 WorkerEvent::Log {
1681 timestamp,
1682 level,
1683 context,
1684 message,
1685 } => Some(OplogEntry::Log {
1686 timestamp: *timestamp,
1687 level: match level {
1688 LogLevel::Trace => oplog::LogLevel::Trace,
1689 LogLevel::Debug => oplog::LogLevel::Debug,
1690 LogLevel::Info => oplog::LogLevel::Info,
1691 LogLevel::Warn => oplog::LogLevel::Warn,
1692 LogLevel::Error => oplog::LogLevel::Error,
1693 LogLevel::Critical => oplog::LogLevel::Critical,
1694 },
1695 context: context.clone(),
1696 message: message.clone(),
1697 }),
1698 WorkerEvent::InvocationStart { .. } => None,
1699 WorkerEvent::InvocationFinished { .. } => None,
1700 WorkerEvent::Close => None,
1701 }
1702 }
1703}
1704
1705impl Display for WorkerEvent {
1706 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1707 match self {
1708 WorkerEvent::StdOut { bytes, .. } => {
1709 write!(
1710 f,
1711 "<stdout> {}",
1712 String::from_utf8(bytes.clone()).unwrap_or_default()
1713 )
1714 }
1715 WorkerEvent::StdErr { bytes, .. } => {
1716 write!(
1717 f,
1718 "<stderr> {}",
1719 String::from_utf8(bytes.clone()).unwrap_or_default()
1720 )
1721 }
1722 WorkerEvent::Log {
1723 level,
1724 context,
1725 message,
1726 ..
1727 } => {
1728 write!(f, "<log> {:?} {} {}", level, context, message)
1729 }
1730 WorkerEvent::InvocationStart {
1731 function,
1732 idempotency_key,
1733 ..
1734 } => {
1735 write!(f, "<invocation-start> {} {}", function, idempotency_key)
1736 }
1737 WorkerEvent::InvocationFinished {
1738 function,
1739 idempotency_key,
1740 ..
1741 } => {
1742 write!(f, "<invocation-finished> {} {}", function, idempotency_key)
1743 }
1744 WorkerEvent::Close => {
1745 write!(f, "<close>")
1746 }
1747 }
1748 }
1749}
1750
1751#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
1752#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1753#[repr(i32)]
1754pub enum ComponentType {
1755 Durable = 0,
1756 Ephemeral = 1,
1757}
1758
1759impl TryFrom<i32> for ComponentType {
1760 type Error = String;
1761
1762 fn try_from(value: i32) -> Result<Self, Self::Error> {
1763 match value {
1764 0 => Ok(ComponentType::Durable),
1765 1 => Ok(ComponentType::Ephemeral),
1766 _ => Err(format!("Unknown Component Type: {}", value)),
1767 }
1768 }
1769}
1770
1771impl Display for ComponentType {
1772 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1773 let s = match self {
1774 ComponentType::Durable => "Durable",
1775 ComponentType::Ephemeral => "Ephemeral",
1776 };
1777 write!(f, "{}", s)
1778 }
1779}
1780
1781impl FromStr for ComponentType {
1782 type Err = String;
1783
1784 fn from_str(s: &str) -> Result<Self, Self::Err> {
1785 match s {
1786 "Durable" => Ok(ComponentType::Durable),
1787 "Ephemeral" => Ok(ComponentType::Ephemeral),
1788 _ => Err(format!("Unknown Component Type: {}", s)),
1789 }
1790 }
1791}
1792
1793#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1794#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1795#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1796#[serde(rename_all = "camelCase")]
1797pub struct Empty {}
1798
1799#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1802#[cfg_attr(feature = "poem", derive(poem_openapi::NewType))]
1803pub struct InitialComponentFileKey(pub String);
1804
1805impl Display for InitialComponentFileKey {
1806 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1807 self.0.fmt(f)
1808 }
1809}
1810
1811#[derive(Clone, Debug, Eq, PartialEq, Hash)]
1817pub struct ComponentFilePath(Utf8UnixPathBuf);
1818
1819impl ComponentFilePath {
1820 pub fn from_abs_str(s: &str) -> Result<Self, String> {
1821 let buf: Utf8UnixPathBuf = s.into();
1822 if !buf.is_absolute() {
1823 return Err("Path must be absolute".to_string());
1824 }
1825
1826 Ok(ComponentFilePath(buf.normalize()))
1827 }
1828
1829 pub fn from_rel_str(s: &str) -> Result<Self, String> {
1830 Self::from_abs_str(&format!("/{}", s))
1831 }
1832
1833 pub fn from_either_str(s: &str) -> Result<Self, String> {
1834 if s.starts_with('/') {
1835 Self::from_abs_str(s)
1836 } else {
1837 Self::from_rel_str(s)
1838 }
1839 }
1840
1841 pub fn as_path(&self) -> &Utf8UnixPathBuf {
1842 &self.0
1843 }
1844
1845 pub fn to_rel_string(&self) -> String {
1846 self.0.strip_prefix("/").unwrap().to_string()
1847 }
1848
1849 pub fn extend(&mut self, path: &str) -> Result<(), String> {
1850 self.0.push_checked(path).map_err(|e| e.to_string())
1851 }
1852}
1853
1854impl Display for ComponentFilePath {
1855 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1856 self.0.fmt(f)
1857 }
1858}
1859
1860impl Serialize for ComponentFilePath {
1861 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1862 where
1863 S: Serializer,
1864 {
1865 String::serialize(&self.to_string(), serializer)
1866 }
1867}
1868
1869impl<'de> Deserialize<'de> for ComponentFilePath {
1870 fn deserialize<D>(deserializer: D) -> Result<ComponentFilePath, D::Error>
1871 where
1872 D: Deserializer<'de>,
1873 {
1874 let str = String::deserialize(deserializer)?;
1875 Self::from_abs_str(&str).map_err(de::Error::custom)
1876 }
1877}
1878
1879impl TryFrom<&str> for ComponentFilePath {
1880 type Error = String;
1881 fn try_from(value: &str) -> Result<Self, Self::Error> {
1882 Self::from_either_str(value)
1883 }
1884}
1885
1886#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
1887#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1888#[serde(rename_all = "kebab-case")]
1889#[cfg_attr(feature = "poem", oai(rename_all = "kebab-case"))]
1890pub enum ComponentFilePermissions {
1891 ReadOnly,
1892 ReadWrite,
1893}
1894
1895impl ComponentFilePermissions {
1896 pub fn as_compact_str(&self) -> &'static str {
1897 match self {
1898 ComponentFilePermissions::ReadOnly => "ro",
1899 ComponentFilePermissions::ReadWrite => "rw",
1900 }
1901 }
1902 pub fn from_compact_str(s: &str) -> Result<Self, String> {
1903 match s {
1904 "ro" => Ok(ComponentFilePermissions::ReadOnly),
1905 "rw" => Ok(ComponentFilePermissions::ReadWrite),
1906 _ => Err(format!("Unknown permissions: {}", s)),
1907 }
1908 }
1909}
1910
1911#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1912#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1913#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1914#[serde(rename_all = "camelCase")]
1915pub struct InitialComponentFile {
1916 pub key: InitialComponentFileKey,
1917 pub path: ComponentFilePath,
1918 pub permissions: ComponentFilePermissions,
1919}
1920
1921impl InitialComponentFile {
1922 pub fn is_read_only(&self) -> bool {
1923 self.permissions == ComponentFilePermissions::ReadOnly
1924 }
1925}
1926
1927#[derive(Clone, Debug, Serialize, Deserialize)]
1928#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1929#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1930#[serde(rename_all = "camelCase")]
1931pub struct ComponentFilePathWithPermissions {
1932 pub path: ComponentFilePath,
1933 pub permissions: ComponentFilePermissions,
1934}
1935
1936impl ComponentFilePathWithPermissions {
1937 pub fn extend_path(&mut self, path: &str) -> Result<(), String> {
1938 self.path.extend(path)
1939 }
1940}
1941
1942impl Display for ComponentFilePathWithPermissions {
1943 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1944 write!(f, "{}", serde_json::to_string(self).unwrap())
1945 }
1946}
1947
1948#[derive(Clone, Debug, Serialize, Deserialize)]
1949#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1950#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1951#[serde(rename_all = "camelCase")]
1952pub struct ComponentFilePathWithPermissionsList {
1953 pub values: Vec<ComponentFilePathWithPermissions>,
1954}
1955
1956impl Display for ComponentFilePathWithPermissionsList {
1957 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1958 write!(f, "{}", serde_json::to_string(self).unwrap())
1959 }
1960}
1961
1962#[derive(Clone, Debug, PartialEq)]
1963pub enum ComponentFileSystemNodeDetails {
1964 File {
1965 permissions: ComponentFilePermissions,
1966 size: u64,
1967 },
1968 Directory,
1969}
1970
1971#[derive(Clone, Debug, PartialEq)]
1972pub struct ComponentFileSystemNode {
1973 pub name: String,
1974 pub last_modified: SystemTime,
1975 pub details: ComponentFileSystemNodeDetails,
1976}
1977
1978#[derive(Debug, Clone, PartialEq, Serialize, Encode, Decode, Default)]
1979#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1980#[serde(rename_all = "kebab-case")]
1981#[cfg_attr(feature = "poem", oai(rename_all = "kebab-case"))]
1982pub enum GatewayBindingType {
1983 #[default]
1984 Default,
1985 FileServer,
1986 HttpHandler,
1987 CorsPreflight,
1988}
1989
1990impl<'de> Deserialize<'de> for GatewayBindingType {
1992 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1993 where
1994 D: Deserializer<'de>,
1995 {
1996 struct GatewayBindingTypeVisitor;
1997
1998 impl de::Visitor<'_> for GatewayBindingTypeVisitor {
1999 type Value = GatewayBindingType;
2000
2001 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
2002 formatter.write_str("a string representing the binding type")
2003 }
2004
2005 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
2006 where
2007 E: de::Error,
2008 {
2009 match value {
2010 "default" | "wit-worker" => Ok(GatewayBindingType::Default),
2011 "file-server" => Ok(GatewayBindingType::FileServer),
2012 "cors-preflight" => Ok(GatewayBindingType::CorsPreflight),
2013 _ => Err(de::Error::invalid_value(Unexpected::Str(value), &self)),
2014 }
2015 }
2016 }
2017
2018 deserializer.deserialize_str(GatewayBindingTypeVisitor)
2019 }
2020}
2021
2022impl TryFrom<String> for GatewayBindingType {
2023 type Error = String;
2024
2025 fn try_from(value: String) -> Result<Self, Self::Error> {
2026 match value.as_str() {
2027 "default" => Ok(GatewayBindingType::Default),
2028 "file-server" => Ok(GatewayBindingType::FileServer),
2029 _ => Err(format!("Invalid WorkerBindingType: {}", value)),
2030 }
2031 }
2032}
2033
2034impl From<crate::model::WorkerId> for golem_wasm_rpc::WorkerId {
2035 fn from(worker_id: crate::model::WorkerId) -> Self {
2036 golem_wasm_rpc::WorkerId {
2037 component_id: worker_id.component_id.into(),
2038 worker_name: worker_id.worker_name,
2039 }
2040 }
2041}
2042
2043impl From<golem_wasm_rpc::WorkerId> for crate::model::WorkerId {
2044 fn from(host: golem_wasm_rpc::WorkerId) -> Self {
2045 Self {
2046 component_id: host.component_id.into(),
2047 worker_name: host.worker_name,
2048 }
2049 }
2050}
2051
2052impl From<golem_wasm_rpc::ComponentId> for crate::model::ComponentId {
2053 fn from(host: golem_wasm_rpc::ComponentId) -> Self {
2054 let high_bits = host.uuid.high_bits;
2055 let low_bits = host.uuid.low_bits;
2056
2057 Self(uuid::Uuid::from_u64_pair(high_bits, low_bits))
2058 }
2059}
2060
2061impl From<crate::model::ComponentId> for golem_wasm_rpc::ComponentId {
2062 fn from(component_id: crate::model::ComponentId) -> Self {
2063 let (high_bits, low_bits) = component_id.0.as_u64_pair();
2064
2065 golem_wasm_rpc::ComponentId {
2066 uuid: golem_wasm_rpc::Uuid {
2067 high_bits,
2068 low_bits,
2069 },
2070 }
2071 }
2072}
2073
2074#[cfg(test)]
2075mod tests {
2076 use std::collections::HashSet;
2077 use std::str::FromStr;
2078 use std::time::SystemTime;
2079 use std::vec;
2080 use test_r::test;
2081 use tracing::info;
2082
2083 use crate::model::oplog::OplogIndex;
2084
2085 use crate::model::{
2086 AccountId, ComponentFilePath, ComponentId, FilterComparator, IdempotencyKey, ShardId,
2087 StringFilterComparator, TargetWorkerId, Timestamp, WorkerFilter, WorkerId, WorkerMetadata,
2088 WorkerStatus, WorkerStatusRecord,
2089 };
2090 use bincode::{Decode, Encode};
2091
2092 use rand::{rng, Rng};
2093 use serde::{Deserialize, Serialize};
2094
2095 #[test]
2096 fn timestamp_conversion() {
2097 let ts: Timestamp = Timestamp::now_utc();
2098
2099 let prost_ts: prost_types::Timestamp = ts.into();
2100
2101 let ts2: Timestamp = prost_ts.into();
2102
2103 assert_eq!(ts2, ts);
2104 }
2105
2106 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
2107 struct ExampleWithAccountId {
2108 account_id: AccountId,
2109 }
2110
2111 #[test]
2112 fn account_id_from_json_apigateway_version() {
2113 let json = "{ \"account_id\": \"account-1\" }";
2114 let example: ExampleWithAccountId = serde_json::from_str(json).unwrap();
2115 assert_eq!(
2116 example.account_id,
2117 AccountId {
2118 value: "account-1".to_string()
2119 }
2120 );
2121 }
2122
2123 #[test]
2124 fn account_id_json_serialization() {
2125 let example: ExampleWithAccountId = ExampleWithAccountId {
2127 account_id: AccountId {
2128 value: "account-1".to_string(),
2129 },
2130 };
2131 let json = serde_json::to_string(&example).unwrap();
2132 assert_eq!(json, "{\"account_id\":\"account-1\"}");
2133 }
2134
2135 #[test]
2136 fn worker_filter_parse() {
2137 assert_eq!(
2138 WorkerFilter::from_str(" name = worker-1").unwrap(),
2139 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2140 );
2141
2142 assert_eq!(
2143 WorkerFilter::from_str("status == Running").unwrap(),
2144 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2145 );
2146
2147 assert_eq!(
2148 WorkerFilter::from_str("version >= 10").unwrap(),
2149 WorkerFilter::new_version(FilterComparator::GreaterEqual, 10)
2150 );
2151
2152 assert_eq!(
2153 WorkerFilter::from_str("env.tag1 == abc ").unwrap(),
2154 WorkerFilter::new_env(
2155 "tag1".to_string(),
2156 StringFilterComparator::Equal,
2157 "abc".to_string(),
2158 )
2159 );
2160 }
2161
2162 #[test]
2163 fn worker_filter_combination() {
2164 assert_eq!(
2165 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).not(),
2166 WorkerFilter::new_not(WorkerFilter::new_name(
2167 StringFilterComparator::Equal,
2168 "worker-1".to_string(),
2169 ))
2170 );
2171
2172 assert_eq!(
2173 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).and(
2174 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2175 ),
2176 WorkerFilter::new_and(vec![
2177 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2178 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2179 ])
2180 );
2181
2182 assert_eq!(
2183 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2184 .and(WorkerFilter::new_status(
2185 FilterComparator::Equal,
2186 WorkerStatus::Running,
2187 ))
2188 .and(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2189 WorkerFilter::new_and(vec![
2190 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2191 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2192 WorkerFilter::new_version(FilterComparator::Equal, 1),
2193 ])
2194 );
2195
2196 assert_eq!(
2197 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).or(
2198 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2199 ),
2200 WorkerFilter::new_or(vec![
2201 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2202 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2203 ])
2204 );
2205
2206 assert_eq!(
2207 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2208 .or(WorkerFilter::new_status(
2209 FilterComparator::NotEqual,
2210 WorkerStatus::Running,
2211 ))
2212 .or(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2213 WorkerFilter::new_or(vec![
2214 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2215 WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2216 WorkerFilter::new_version(FilterComparator::Equal, 1),
2217 ])
2218 );
2219
2220 assert_eq!(
2221 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2222 .and(WorkerFilter::new_status(
2223 FilterComparator::NotEqual,
2224 WorkerStatus::Running,
2225 ))
2226 .or(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2227 WorkerFilter::new_or(vec![
2228 WorkerFilter::new_and(vec![
2229 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2230 WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2231 ]),
2232 WorkerFilter::new_version(FilterComparator::Equal, 1),
2233 ])
2234 );
2235
2236 assert_eq!(
2237 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2238 .or(WorkerFilter::new_status(
2239 FilterComparator::NotEqual,
2240 WorkerStatus::Running,
2241 ))
2242 .and(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2243 WorkerFilter::new_and(vec![
2244 WorkerFilter::new_or(vec![
2245 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2246 WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2247 ]),
2248 WorkerFilter::new_version(FilterComparator::Equal, 1),
2249 ])
2250 );
2251 }
2252
2253 #[test]
2254 fn worker_filter_matches() {
2255 let component_id = ComponentId::new_v4();
2256 let worker_metadata = WorkerMetadata {
2257 worker_id: WorkerId {
2258 worker_name: "worker-1".to_string(),
2259 component_id,
2260 },
2261 args: vec![],
2262 env: vec![
2263 ("env1".to_string(), "value1".to_string()),
2264 ("env2".to_string(), "value2".to_string()),
2265 ],
2266 account_id: AccountId {
2267 value: "account-1".to_string(),
2268 },
2269 created_at: Timestamp::now_utc(),
2270 parent: None,
2271 last_known_status: WorkerStatusRecord {
2272 component_version: 1,
2273 ..WorkerStatusRecord::default()
2274 },
2275 };
2276
2277 assert!(
2278 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2279 .and(WorkerFilter::new_status(
2280 FilterComparator::Equal,
2281 WorkerStatus::Idle,
2282 ))
2283 .matches(&worker_metadata)
2284 );
2285
2286 assert!(WorkerFilter::new_env(
2287 "env1".to_string(),
2288 StringFilterComparator::Equal,
2289 "value1".to_string(),
2290 )
2291 .and(WorkerFilter::new_status(
2292 FilterComparator::Equal,
2293 WorkerStatus::Idle,
2294 ))
2295 .matches(&worker_metadata));
2296
2297 assert!(WorkerFilter::new_env(
2298 "env1".to_string(),
2299 StringFilterComparator::Equal,
2300 "value2".to_string(),
2301 )
2302 .not()
2303 .and(
2304 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running).or(
2305 WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Idle)
2306 )
2307 )
2308 .matches(&worker_metadata));
2309
2310 assert!(
2311 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2312 .and(WorkerFilter::new_version(FilterComparator::Equal, 1))
2313 .matches(&worker_metadata)
2314 );
2315
2316 assert!(
2317 WorkerFilter::new_name(StringFilterComparator::Equal, "worker-2".to_string())
2318 .or(WorkerFilter::new_version(FilterComparator::Equal, 1))
2319 .matches(&worker_metadata)
2320 );
2321
2322 assert!(WorkerFilter::new_version(FilterComparator::GreaterEqual, 1)
2323 .and(WorkerFilter::new_version(FilterComparator::Less, 2))
2324 .or(WorkerFilter::new_name(
2325 StringFilterComparator::Equal,
2326 "worker-2".to_string(),
2327 ))
2328 .matches(&worker_metadata));
2329 }
2330
2331 #[test]
2332 fn target_worker_id_force_shards() {
2333 let mut rng = rng();
2334 const SHARD_COUNT: usize = 1000;
2335 const EXAMPLE_COUNT: usize = 1000;
2336 for _ in 0..EXAMPLE_COUNT {
2337 let mut shard_ids = HashSet::new();
2338 let count = rng.random_range(0..100);
2339 for _ in 0..count {
2340 let shard_id = rng.random_range(0..SHARD_COUNT);
2341 shard_ids.insert(ShardId {
2342 value: shard_id as i64,
2343 });
2344 }
2345
2346 let component_id = ComponentId::new_v4();
2347 let target_worker_id = TargetWorkerId {
2348 component_id,
2349 worker_name: None,
2350 };
2351
2352 let start = SystemTime::now();
2353 let worker_id = target_worker_id.into_worker_id(&shard_ids, SHARD_COUNT);
2354 let end = SystemTime::now();
2355 info!(
2356 "Time with {count} valid shards: {:?}",
2357 end.duration_since(start).unwrap()
2358 );
2359
2360 if !shard_ids.is_empty() {
2361 assert!(shard_ids.contains(&ShardId::from_worker_id(&worker_id, SHARD_COUNT)));
2362 }
2363 }
2364 }
2365
2366 #[test]
2367 fn derived_idempotency_key() {
2368 let base1 = IdempotencyKey::fresh();
2369 let base2 = IdempotencyKey::fresh();
2370 let base3 = IdempotencyKey {
2371 value: "base3".to_string(),
2372 };
2373
2374 assert_ne!(base1, base2);
2375
2376 let idx1 = OplogIndex::from_u64(2);
2377 let idx2 = OplogIndex::from_u64(11);
2378
2379 let derived11a = IdempotencyKey::derived(&base1, idx1);
2380 let derived12a = IdempotencyKey::derived(&base1, idx2);
2381 let derived21a = IdempotencyKey::derived(&base2, idx1);
2382 let derived22a = IdempotencyKey::derived(&base2, idx2);
2383
2384 let derived11b = IdempotencyKey::derived(&base1, idx1);
2385 let derived12b = IdempotencyKey::derived(&base1, idx2);
2386 let derived21b = IdempotencyKey::derived(&base2, idx1);
2387 let derived22b = IdempotencyKey::derived(&base2, idx2);
2388
2389 let derived31 = IdempotencyKey::derived(&base3, idx1);
2390 let derived32 = IdempotencyKey::derived(&base3, idx2);
2391
2392 assert_eq!(derived11a, derived11b);
2393 assert_eq!(derived12a, derived12b);
2394 assert_eq!(derived21a, derived21b);
2395 assert_eq!(derived22a, derived22b);
2396
2397 assert_ne!(derived11a, derived12a);
2398 assert_ne!(derived11a, derived21a);
2399 assert_ne!(derived11a, derived22a);
2400 assert_ne!(derived12a, derived21a);
2401 assert_ne!(derived12a, derived22a);
2402 assert_ne!(derived21a, derived22a);
2403
2404 assert_ne!(derived11a, derived31);
2405 assert_ne!(derived21a, derived31);
2406 assert_ne!(derived12a, derived32);
2407 assert_ne!(derived22a, derived32);
2408 assert_ne!(derived31, derived32);
2409 }
2410
2411 #[test]
2412 fn initial_component_file_path_from_absolute() {
2413 let path = ComponentFilePath::from_abs_str("/a/b/c").unwrap();
2414 assert_eq!(path.to_string(), "/a/b/c");
2415 }
2416
2417 #[test]
2418 fn initial_component_file_path_from_relative() {
2419 let path = ComponentFilePath::from_abs_str("a/b/c");
2420 assert!(path.is_err());
2421 }
2422}