golem_common/model/
mod.rs

1// Copyright 2024-2025 Golem Cloud
2//
3// Licensed under the Golem Source License v1.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://license.golem.cloud/LICENSE
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::model::oplog::{
16    IndexedResourceKey, OplogEntry, TimestampedUpdateDescription, WorkerResourceId,
17};
18use crate::model::regions::DeletedRegions;
19use bincode::de::{BorrowDecoder, Decoder};
20use bincode::enc::Encoder;
21use bincode::error::{DecodeError, EncodeError};
22use bincode::{BorrowDecode, Decode, Encode};
23
24pub use crate::base_model::*;
25use crate::model::invocation_context::InvocationContextStack;
26use golem_wasm_ast::analysis::analysed_type::{field, list, record, str, tuple, u32, u64};
27use golem_wasm_ast::analysis::AnalysedType;
28use golem_wasm_rpc::{IntoValue, Value};
29use golem_wasm_rpc_derive::IntoValue;
30use http::Uri;
31use rand::prelude::IteratorRandom;
32use serde::de::Unexpected;
33use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
34use std::cmp::Ordering;
35use std::collections::{HashMap, HashSet, VecDeque};
36use std::fmt::{Display, Formatter};
37use std::ops::Add;
38use std::str::FromStr;
39use std::time::{Duration, SystemTime};
40use typed_path::Utf8UnixPathBuf;
41use uuid::{uuid, Uuid};
42
43pub mod auth;
44pub mod base64;
45pub mod component;
46pub mod component_constraint;
47pub mod component_metadata;
48pub mod config;
49pub mod error;
50pub mod exports;
51pub mod invocation_context;
52pub mod lucene;
53pub mod oplog;
54pub mod plugin;
55pub mod project;
56pub mod public_oplog;
57pub mod regions;
58pub mod trim_date;
59
60#[cfg(feature = "poem")]
61mod poem;
62
63#[cfg(feature = "protobuf")]
64pub mod protobuf;
65
66#[cfg(feature = "poem")]
67pub trait PoemTypeRequirements:
68    poem_openapi::types::Type + poem_openapi::types::ParseFromJSON + poem_openapi::types::ToJSON
69{
70}
71
72#[cfg(not(feature = "poem"))]
73pub trait PoemTypeRequirements {}
74
75#[cfg(feature = "poem")]
76impl<
77        T: poem_openapi::types::Type
78            + poem_openapi::types::ParseFromJSON
79            + poem_openapi::types::ToJSON,
80    > PoemTypeRequirements for T
81{
82}
83
84#[cfg(not(feature = "poem"))]
85impl<T> PoemTypeRequirements for T {}
86
87#[cfg(feature = "poem")]
88pub trait PoemMultipartTypeRequirements: poem_openapi::types::ParseFromMultipartField {}
89
90#[cfg(not(feature = "poem"))]
91pub trait PoemMultipartTypeRequirements {}
92
93#[cfg(feature = "poem")]
94impl<T: poem_openapi::types::ParseFromMultipartField> PoemMultipartTypeRequirements for T {}
95
96#[cfg(not(feature = "poem"))]
97impl<T> PoemMultipartTypeRequirements for T {}
98
99#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
100#[repr(transparent)]
101pub struct Timestamp(iso8601_timestamp::Timestamp);
102
103impl Timestamp {
104    pub fn now_utc() -> Timestamp {
105        Timestamp(iso8601_timestamp::Timestamp::now_utc())
106    }
107
108    pub fn to_millis(&self) -> u64 {
109        self.0
110            .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH)
111            .whole_milliseconds() as u64
112    }
113}
114
115impl Display for Timestamp {
116    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
117        write!(f, "{}", self.0)
118    }
119}
120
121impl FromStr for Timestamp {
122    type Err = String;
123
124    fn from_str(s: &str) -> Result<Self, Self::Err> {
125        match iso8601_timestamp::Timestamp::parse(s) {
126            Some(ts) => Ok(Self(ts)),
127            None => Err("Invalid timestamp".to_string()),
128        }
129    }
130}
131
132impl serde::Serialize for Timestamp {
133    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
134    where
135        S: Serializer,
136    {
137        self.0.serialize(serializer)
138    }
139}
140
141impl<'de> serde::Deserialize<'de> for Timestamp {
142    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
143    where
144        D: Deserializer<'de>,
145    {
146        if deserializer.is_human_readable() {
147            iso8601_timestamp::Timestamp::deserialize(deserializer).map(Self)
148        } else {
149            // For non-human-readable formats we assume it was an i64 representing milliseconds from epoch
150            let timestamp = i64::deserialize(deserializer)?;
151            Ok(Timestamp(
152                iso8601_timestamp::Timestamp::UNIX_EPOCH
153                    .add(Duration::from_millis(timestamp as u64)),
154            ))
155        }
156    }
157}
158
159impl bincode::Encode for Timestamp {
160    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
161        (self
162            .0
163            .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH)
164            .whole_milliseconds() as i64)
165            .encode(encoder)
166    }
167}
168
169impl<Context> bincode::Decode<Context> for Timestamp {
170    fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
171        let timestamp: i64 = bincode::Decode::decode(decoder)?;
172        Ok(Timestamp(
173            iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(timestamp as u64)),
174        ))
175    }
176}
177
178impl<'de, Context> bincode::BorrowDecode<'de, Context> for Timestamp {
179    fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
180        decoder: &mut D,
181    ) -> Result<Self, DecodeError> {
182        let timestamp: i64 = bincode::BorrowDecode::borrow_decode(decoder)?;
183        Ok(Timestamp(
184            iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(timestamp as u64)),
185        ))
186    }
187}
188
189impl From<u64> for Timestamp {
190    fn from(value: u64) -> Self {
191        Timestamp(iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(value)))
192    }
193}
194
195impl IntoValue for Timestamp {
196    fn into_value(self) -> Value {
197        let d = self
198            .0
199            .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH);
200        Value::Record(vec![
201            Value::U64(d.whole_seconds() as u64),
202            Value::U32(d.subsec_nanoseconds() as u32),
203        ])
204    }
205
206    fn get_type() -> AnalysedType {
207        record(vec![field("seconds", u64()), field("nanoseconds", u32())])
208    }
209}
210
211/// Associates a worker-id with its owner account
212#[derive(Clone, Debug, Eq, PartialEq, Hash, Encode, Decode)]
213pub struct OwnedWorkerId {
214    pub account_id: AccountId,
215    pub worker_id: WorkerId,
216}
217
218impl OwnedWorkerId {
219    pub fn new(account_id: &AccountId, worker_id: &WorkerId) -> Self {
220        Self {
221            account_id: account_id.clone(),
222            worker_id: worker_id.clone(),
223        }
224    }
225
226    pub fn worker_id(&self) -> WorkerId {
227        self.worker_id.clone()
228    }
229
230    pub fn account_id(&self) -> AccountId {
231        self.account_id.clone()
232    }
233
234    pub fn component_id(&self) -> ComponentId {
235        self.worker_id.component_id.clone()
236    }
237
238    pub fn worker_name(&self) -> String {
239        self.worker_id.worker_name.clone()
240    }
241}
242
243impl Display for OwnedWorkerId {
244    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
245        write!(f, "{}/{}", self.account_id, self.worker_id)
246    }
247}
248
249impl AsRef<WorkerId> for OwnedWorkerId {
250    fn as_ref(&self) -> &WorkerId {
251        &self.worker_id
252    }
253}
254
255/// Actions that can be scheduled to be executed at a given point in time
256#[derive(Debug, Clone, PartialEq, Encode, Decode)]
257pub enum ScheduledAction {
258    /// Completes a given promise
259    CompletePromise {
260        account_id: AccountId,
261        promise_id: PromiseId,
262    },
263    /// Archives all entries from the first non-empty layer of an oplog to the next layer,
264    /// if the last oplog index did not change. If there are more layers below, schedules
265    /// a next action to archive the next layer.
266    ArchiveOplog {
267        owned_worker_id: OwnedWorkerId,
268        last_oplog_index: OplogIndex,
269        next_after: Duration,
270    },
271    /// Invoke the given action on the worker. The invocation will only
272    /// be persisted in the oplog when it's actually getting scheduled.
273    Invoke {
274        owned_worker_id: OwnedWorkerId,
275        idempotency_key: IdempotencyKey,
276        full_function_name: String,
277        function_input: Vec<Value>,
278        invocation_context: InvocationContextStack,
279    },
280}
281
282impl ScheduledAction {
283    pub fn owned_worker_id(&self) -> OwnedWorkerId {
284        match self {
285            ScheduledAction::CompletePromise {
286                account_id,
287                promise_id,
288            } => OwnedWorkerId::new(account_id, &promise_id.worker_id),
289            ScheduledAction::ArchiveOplog {
290                owned_worker_id, ..
291            } => owned_worker_id.clone(),
292            ScheduledAction::Invoke {
293                owned_worker_id, ..
294            } => owned_worker_id.clone(),
295        }
296    }
297}
298
299impl Display for ScheduledAction {
300    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
301        match self {
302            ScheduledAction::CompletePromise { promise_id, .. } => {
303                write!(f, "complete[{}]", promise_id)
304            }
305            ScheduledAction::ArchiveOplog {
306                owned_worker_id, ..
307            } => {
308                write!(f, "archive[{}]", owned_worker_id)
309            }
310            ScheduledAction::Invoke {
311                owned_worker_id, ..
312            } => write!(f, "invoke[{}]", owned_worker_id),
313        }
314    }
315}
316
317#[derive(Debug, Clone, Encode, Decode)]
318pub struct ScheduleId {
319    pub timestamp: i64,
320    pub action: ScheduledAction,
321}
322
323impl Display for ScheduleId {
324    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
325        write!(f, "{}@{}", self.action, self.timestamp)
326    }
327}
328
329#[derive(Clone)]
330pub struct NumberOfShards {
331    pub value: usize,
332}
333
334#[derive(Clone, Debug, PartialEq, Eq, Hash)]
335pub struct Pod {
336    host: String,
337    port: u16,
338}
339
340impl Pod {
341    pub fn uri(&self) -> Uri {
342        Uri::builder()
343            .scheme("http")
344            .authority(format!("{}:{}", self.host, self.port).as_str())
345            .path_and_query("/")
346            .build()
347            .expect("Failed to build URI")
348    }
349}
350
351#[derive(Clone)]
352pub struct RoutingTable {
353    pub number_of_shards: NumberOfShards,
354    shard_assignments: HashMap<ShardId, Pod>,
355}
356
357impl RoutingTable {
358    pub fn lookup(&self, worker_id: &WorkerId) -> Option<&Pod> {
359        self.shard_assignments.get(&ShardId::from_worker_id(
360            &worker_id.clone(),
361            self.number_of_shards.value,
362        ))
363    }
364
365    pub fn random(&self) -> Option<&Pod> {
366        self.shard_assignments.values().choose(&mut rand::rng())
367    }
368
369    pub fn first(&self) -> Option<&Pod> {
370        self.shard_assignments.values().next()
371    }
372
373    pub fn all(&self) -> HashSet<&Pod> {
374        self.shard_assignments.values().collect()
375    }
376}
377
378#[allow(dead_code)]
379pub struct RoutingTableEntry {
380    shard_id: ShardId,
381    pod: Pod,
382}
383
384#[derive(Clone, Debug, Default)]
385pub struct ShardAssignment {
386    pub number_of_shards: usize,
387    pub shard_ids: HashSet<ShardId>,
388}
389
390impl ShardAssignment {
391    pub fn new(number_of_shards: usize, shard_ids: HashSet<ShardId>) -> Self {
392        Self {
393            number_of_shards,
394            shard_ids,
395        }
396    }
397
398    pub fn assign_shards(&mut self, shard_ids: &HashSet<ShardId>) {
399        for shard_id in shard_ids {
400            self.shard_ids.insert(*shard_id);
401        }
402    }
403
404    pub fn register(&mut self, number_of_shards: usize, shard_ids: &HashSet<ShardId>) {
405        self.number_of_shards = number_of_shards;
406        for shard_id in shard_ids {
407            self.shard_ids.insert(*shard_id);
408        }
409    }
410
411    pub fn revoke_shards(&mut self, shard_ids: &HashSet<ShardId>) {
412        for shard_id in shard_ids {
413            self.shard_ids.remove(shard_id);
414        }
415    }
416}
417
418impl Display for ShardAssignment {
419    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
420        let shard_ids = self
421            .shard_ids
422            .iter()
423            .map(|shard_id| shard_id.to_string())
424            .collect::<Vec<_>>()
425            .join(",");
426        write!(
427            f,
428            "{{ number_of_shards: {}, shard_ids: {} }}",
429            self.number_of_shards, shard_ids
430        )
431    }
432}
433
434#[derive(Clone, Debug, Encode, Decode, Eq, Hash, PartialEq, IntoValue)]
435#[wit_transparent]
436pub struct IdempotencyKey {
437    pub value: String,
438}
439
440impl IdempotencyKey {
441    const ROOT_NS: Uuid = uuid!("9C19B15A-C83D-46F7-9BC3-EAD7923733F4");
442
443    pub fn new(value: String) -> Self {
444        Self { value }
445    }
446
447    pub fn from_uuid(value: Uuid) -> Self {
448        Self {
449            value: value.to_string(),
450        }
451    }
452
453    pub fn fresh() -> Self {
454        Self::from_uuid(Uuid::new_v4())
455    }
456
457    /// Generates a deterministic new idempotency key using a base idempotency key and an oplog index.
458    ///
459    /// The base idempotency key determines the "namespace" of the generated key UUIDv5. If
460    /// the base idempotency key is already an UUID, it is directly used as the namespace of the v5 algorithm,
461    /// while the name part is derived from the given oplog index.
462    ///
463    /// If the base idempotency key is not an UUID (as it can be an arbitrary user-provided string), then first
464    /// we generate a UUIDv5 in the ROOT_NS namespace and use that as unique namespace for generating
465    /// the new idempotency key.
466    pub fn derived(base: &IdempotencyKey, oplog_index: OplogIndex) -> Self {
467        let namespace = if let Ok(base_uuid) = Uuid::parse_str(&base.value) {
468            base_uuid
469        } else {
470            Uuid::new_v5(&Self::ROOT_NS, base.value.as_bytes())
471        };
472        let name = format!("oplog-index-{}", oplog_index);
473        Self::from_uuid(Uuid::new_v5(&namespace, name.as_bytes()))
474    }
475}
476
477impl Serialize for IdempotencyKey {
478    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
479    where
480        S: Serializer,
481    {
482        self.value.serialize(serializer)
483    }
484}
485
486impl<'de> Deserialize<'de> for IdempotencyKey {
487    fn deserialize<D>(deserializer: D) -> Result<IdempotencyKey, D::Error>
488    where
489        D: Deserializer<'de>,
490    {
491        let value = String::deserialize(deserializer)?;
492        Ok(IdempotencyKey { value })
493    }
494}
495
496impl Display for IdempotencyKey {
497    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
498        write!(f, "{}", self.value)
499    }
500}
501
502#[derive(Clone, Debug)]
503pub struct WorkerMetadata {
504    pub worker_id: WorkerId,
505    pub args: Vec<String>,
506    pub env: Vec<(String, String)>,
507    pub account_id: AccountId,
508    pub created_at: Timestamp,
509    pub parent: Option<WorkerId>,
510    pub last_known_status: WorkerStatusRecord,
511}
512
513impl WorkerMetadata {
514    pub fn default(worker_id: WorkerId, account_id: AccountId) -> WorkerMetadata {
515        WorkerMetadata {
516            worker_id,
517            args: vec![],
518            env: vec![],
519            account_id,
520            created_at: Timestamp::now_utc(),
521            parent: None,
522            last_known_status: WorkerStatusRecord::default(),
523        }
524    }
525
526    pub fn owned_worker_id(&self) -> OwnedWorkerId {
527        OwnedWorkerId::new(&self.account_id, &self.worker_id)
528    }
529}
530
531impl IntoValue for WorkerMetadata {
532    fn into_value(self) -> Value {
533        Value::Record(vec![
534            self.worker_id.into_value(),
535            self.args.into_value(),
536            self.env.into_value(),
537            self.last_known_status.status.into_value(),
538            self.last_known_status.component_version.into_value(),
539            0u64.into_value(), // retry count could be computed from the worker status record here but we don't support it yet
540        ])
541    }
542
543    fn get_type() -> AnalysedType {
544        record(vec![
545            field("worker-id", WorkerId::get_type()),
546            field("args", list(str())),
547            field("env", list(tuple(vec![str(), str()]))),
548            field("status", WorkerStatus::get_type()),
549            field("component-version", u64()),
550            field("retry-count", u64()),
551        ])
552    }
553}
554
555#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
556pub struct WorkerResourceDescription {
557    pub created_at: Timestamp,
558    pub indexed_resource_key: Option<IndexedResourceKey>,
559}
560
561#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)]
562pub struct RetryConfig {
563    pub max_attempts: u32,
564    #[serde(with = "humantime_serde")]
565    pub min_delay: Duration,
566    #[serde(with = "humantime_serde")]
567    pub max_delay: Duration,
568    pub multiplier: f64,
569    pub max_jitter_factor: Option<f64>,
570}
571
572/// Contains status information about a worker according to a given oplog index.
573///
574/// This status is just cached information, all fields must be computable by the oplog alone.
575/// By having an associated oplog_idx, the cached information can be used together with the
576/// tail of the oplog to determine the actual status of the worker.
577#[derive(Clone, Debug, PartialEq, Encode)]
578pub struct WorkerStatusRecord {
579    pub status: WorkerStatus,
580    pub skipped_regions: DeletedRegions,
581    pub overridden_retry_config: Option<RetryConfig>,
582    pub pending_invocations: Vec<TimestampedWorkerInvocation>,
583    pub pending_updates: VecDeque<TimestampedUpdateDescription>,
584    pub failed_updates: Vec<FailedUpdateRecord>,
585    pub successful_updates: Vec<SuccessfulUpdateRecord>,
586    pub invocation_results: HashMap<IdempotencyKey, OplogIndex>,
587    pub current_idempotency_key: Option<IdempotencyKey>,
588    pub component_version: ComponentVersion,
589    pub component_size: u64,
590    pub total_linear_memory_size: u64,
591    pub owned_resources: HashMap<WorkerResourceId, WorkerResourceDescription>,
592    pub oplog_idx: OplogIndex,
593    pub active_plugins: HashSet<PluginInstallationId>,
594    pub deleted_regions: DeletedRegions,
595    /// The component version at the starting point of the replay. Will be the version of the Create oplog entry
596    /// if only automatic updates were used or the version of the latest snapshot based update
597    pub component_version_for_replay: ComponentVersion,
598}
599
600impl<Context> bincode::Decode<Context> for WorkerStatusRecord {
601    fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
602        Ok(Self {
603            status: Decode::decode(decoder)?,
604            skipped_regions: Decode::decode(decoder)?,
605            overridden_retry_config: Decode::decode(decoder)?,
606            pending_invocations: Decode::decode(decoder)?,
607            pending_updates: Decode::decode(decoder)?,
608            failed_updates: Decode::decode(decoder)?,
609            successful_updates: Decode::decode(decoder)?,
610            invocation_results: Decode::decode(decoder)?,
611            current_idempotency_key: Decode::decode(decoder)?,
612            component_version: Decode::decode(decoder)?,
613            component_size: Decode::decode(decoder)?,
614            total_linear_memory_size: Decode::decode(decoder)?,
615            owned_resources: Decode::decode(decoder)?,
616            oplog_idx: Decode::decode(decoder)?,
617            active_plugins: Decode::decode(decoder)?,
618            deleted_regions: Decode::decode(decoder)?,
619            component_version_for_replay: Decode::decode(decoder)?,
620        })
621    }
622}
623impl<'de, Context> BorrowDecode<'de, Context> for WorkerStatusRecord {
624    fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
625        decoder: &mut D,
626    ) -> Result<Self, DecodeError> {
627        Ok(Self {
628            status: BorrowDecode::borrow_decode(decoder)?,
629            skipped_regions: BorrowDecode::borrow_decode(decoder)?,
630            overridden_retry_config: BorrowDecode::borrow_decode(decoder)?,
631            pending_invocations: BorrowDecode::borrow_decode(decoder)?,
632            pending_updates: BorrowDecode::borrow_decode(decoder)?,
633            failed_updates: BorrowDecode::borrow_decode(decoder)?,
634            successful_updates: BorrowDecode::borrow_decode(decoder)?,
635            invocation_results: BorrowDecode::borrow_decode(decoder)?,
636            current_idempotency_key: BorrowDecode::borrow_decode(decoder)?,
637            component_version: BorrowDecode::borrow_decode(decoder)?,
638            component_size: BorrowDecode::borrow_decode(decoder)?,
639            total_linear_memory_size: BorrowDecode::borrow_decode(decoder)?,
640            owned_resources: BorrowDecode::borrow_decode(decoder)?,
641            oplog_idx: BorrowDecode::borrow_decode(decoder)?,
642            active_plugins: BorrowDecode::borrow_decode(decoder)?,
643            deleted_regions: BorrowDecode::borrow_decode(decoder)?,
644            component_version_for_replay: BorrowDecode::borrow_decode(decoder)?,
645        })
646    }
647}
648
649impl Default for WorkerStatusRecord {
650    fn default() -> Self {
651        WorkerStatusRecord {
652            status: WorkerStatus::Idle,
653            skipped_regions: DeletedRegions::new(),
654            overridden_retry_config: None,
655            pending_invocations: Vec::new(),
656            pending_updates: VecDeque::new(),
657            failed_updates: Vec::new(),
658            successful_updates: Vec::new(),
659            invocation_results: HashMap::new(),
660            current_idempotency_key: None,
661            component_version: 0,
662            component_size: 0,
663            total_linear_memory_size: 0,
664            owned_resources: HashMap::new(),
665            oplog_idx: OplogIndex::default(),
666            active_plugins: HashSet::new(),
667            deleted_regions: DeletedRegions::new(),
668            component_version_for_replay: 0,
669        }
670    }
671}
672
673#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
674pub struct FailedUpdateRecord {
675    pub timestamp: Timestamp,
676    pub target_version: ComponentVersion,
677    pub details: Option<String>,
678}
679
680#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
681pub struct SuccessfulUpdateRecord {
682    pub timestamp: Timestamp,
683    pub target_version: ComponentVersion,
684}
685
686/// Represents last known status of a worker
687///
688/// This is always recorded together with the current oplog index, and it can only be used
689/// as a source of truth if there are no newer oplog entries since the record.
690#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode, IntoValue)]
691#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
692pub enum WorkerStatus {
693    /// The worker is running an invoked function
694    Running,
695    /// The worker is ready to run an invoked function
696    Idle,
697    /// An invocation is active but waiting for something (sleeping, waiting for a promise)
698    Suspended,
699    /// The last invocation was interrupted but will be resumed
700    Interrupted,
701    /// The last invocation failed and a retry was scheduled
702    Retrying,
703    /// The last invocation failed and the worker can no longer be used
704    Failed,
705    /// The worker exited after a successful invocation and can no longer be invoked
706    Exited,
707}
708
709impl PartialOrd for WorkerStatus {
710    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
711        Some(self.cmp(other))
712    }
713}
714
715impl Ord for WorkerStatus {
716    fn cmp(&self, other: &Self) -> Ordering {
717        let v1: i32 = self.clone().into();
718        let v2: i32 = other.clone().into();
719        v1.cmp(&v2)
720    }
721}
722
723impl FromStr for WorkerStatus {
724    type Err = String;
725
726    fn from_str(s: &str) -> Result<Self, Self::Err> {
727        match s.to_lowercase().as_str() {
728            "running" => Ok(WorkerStatus::Running),
729            "idle" => Ok(WorkerStatus::Idle),
730            "suspended" => Ok(WorkerStatus::Suspended),
731            "interrupted" => Ok(WorkerStatus::Interrupted),
732            "retrying" => Ok(WorkerStatus::Retrying),
733            "failed" => Ok(WorkerStatus::Failed),
734            "exited" => Ok(WorkerStatus::Exited),
735            _ => Err(format!("Unknown worker status: {}", s)),
736        }
737    }
738}
739
740impl Display for WorkerStatus {
741    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
742        match self {
743            WorkerStatus::Running => write!(f, "Running"),
744            WorkerStatus::Idle => write!(f, "Idle"),
745            WorkerStatus::Suspended => write!(f, "Suspended"),
746            WorkerStatus::Interrupted => write!(f, "Interrupted"),
747            WorkerStatus::Retrying => write!(f, "Retrying"),
748            WorkerStatus::Failed => write!(f, "Failed"),
749            WorkerStatus::Exited => write!(f, "Exited"),
750        }
751    }
752}
753
754impl TryFrom<i32> for WorkerStatus {
755    type Error = String;
756
757    fn try_from(value: i32) -> Result<Self, Self::Error> {
758        match value {
759            0 => Ok(WorkerStatus::Running),
760            1 => Ok(WorkerStatus::Idle),
761            2 => Ok(WorkerStatus::Suspended),
762            3 => Ok(WorkerStatus::Interrupted),
763            4 => Ok(WorkerStatus::Retrying),
764            5 => Ok(WorkerStatus::Failed),
765            6 => Ok(WorkerStatus::Exited),
766            _ => Err(format!("Unknown worker status: {}", value)),
767        }
768    }
769}
770
771impl From<WorkerStatus> for i32 {
772    fn from(value: WorkerStatus) -> Self {
773        match value {
774            WorkerStatus::Running => 0,
775            WorkerStatus::Idle => 1,
776            WorkerStatus::Suspended => 2,
777            WorkerStatus::Interrupted => 3,
778            WorkerStatus::Retrying => 4,
779            WorkerStatus::Failed => 5,
780            WorkerStatus::Exited => 6,
781        }
782    }
783}
784
785/// Internal representation of `WorkerInvocation` to support backward compatibility
786/// in its binary format.
787#[derive(Clone, Debug, PartialEq, Encode, Decode)]
788enum SerializedWorkerInvocation {
789    ExportedFunctionV1 {
790        idempotency_key: IdempotencyKey,
791        full_function_name: String,
792        function_input: Vec<Value>,
793    },
794    ManualUpdate {
795        target_version: ComponentVersion,
796    },
797    ExportedFunction {
798        idempotency_key: IdempotencyKey,
799        full_function_name: String,
800        function_input: Vec<Value>,
801        invocation_context: InvocationContextStack,
802    },
803}
804
805impl From<WorkerInvocation> for SerializedWorkerInvocation {
806    fn from(value: WorkerInvocation) -> Self {
807        match value {
808            WorkerInvocation::ManualUpdate { target_version } => {
809                Self::ManualUpdate { target_version }
810            }
811            WorkerInvocation::ExportedFunction {
812                idempotency_key,
813                full_function_name,
814                function_input,
815                invocation_context,
816            } => Self::ExportedFunction {
817                idempotency_key,
818                full_function_name,
819                function_input,
820                invocation_context,
821            },
822        }
823    }
824}
825
826impl From<SerializedWorkerInvocation> for WorkerInvocation {
827    fn from(value: SerializedWorkerInvocation) -> Self {
828        match value {
829            SerializedWorkerInvocation::ExportedFunctionV1 {
830                idempotency_key,
831                full_function_name,
832                function_input,
833            } => Self::ExportedFunction {
834                idempotency_key,
835                full_function_name,
836                function_input,
837                invocation_context: InvocationContextStack::fresh(),
838            },
839            SerializedWorkerInvocation::ManualUpdate { target_version } => {
840                Self::ManualUpdate { target_version }
841            }
842            SerializedWorkerInvocation::ExportedFunction {
843                idempotency_key,
844                full_function_name,
845                function_input,
846                invocation_context,
847            } => Self::ExportedFunction {
848                idempotency_key,
849                full_function_name,
850                function_input,
851                invocation_context,
852            },
853        }
854    }
855}
856
857#[derive(Clone, Debug, PartialEq)]
858pub enum WorkerInvocation {
859    ManualUpdate {
860        target_version: ComponentVersion,
861    },
862    ExportedFunction {
863        idempotency_key: IdempotencyKey,
864        full_function_name: String,
865        function_input: Vec<Value>,
866        invocation_context: InvocationContextStack,
867    },
868}
869
870impl WorkerInvocation {
871    pub fn is_idempotency_key(&self, key: &IdempotencyKey) -> bool {
872        match self {
873            Self::ExportedFunction {
874                idempotency_key, ..
875            } => idempotency_key == key,
876            _ => false,
877        }
878    }
879
880    pub fn idempotency_key(&self) -> Option<&IdempotencyKey> {
881        match self {
882            Self::ExportedFunction {
883                idempotency_key, ..
884            } => Some(idempotency_key),
885            _ => None,
886        }
887    }
888
889    pub fn invocation_context(&self) -> InvocationContextStack {
890        match self {
891            Self::ExportedFunction {
892                invocation_context, ..
893            } => invocation_context.clone(),
894            _ => InvocationContextStack::fresh(),
895        }
896    }
897}
898
899impl Encode for WorkerInvocation {
900    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
901        let serialized: SerializedWorkerInvocation = self.clone().into();
902        serialized.encode(encoder)
903    }
904}
905
906impl<Context> Decode<Context> for WorkerInvocation {
907    fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
908        let serialized: SerializedWorkerInvocation = Decode::decode(decoder)?;
909        Ok(serialized.into())
910    }
911}
912
913impl<'de, Context> BorrowDecode<'de, Context> for WorkerInvocation {
914    fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
915        decoder: &mut D,
916    ) -> Result<Self, DecodeError> {
917        let serialized: SerializedWorkerInvocation = BorrowDecode::borrow_decode(decoder)?;
918        Ok(serialized.into())
919    }
920}
921
922#[derive(Clone, Debug, PartialEq, Encode, Decode)]
923pub struct TimestampedWorkerInvocation {
924    pub timestamp: Timestamp,
925    pub invocation: WorkerInvocation,
926}
927
928#[derive(
929    Clone,
930    Debug,
931    PartialOrd,
932    Ord,
933    derive_more::FromStr,
934    Eq,
935    Hash,
936    PartialEq,
937    Serialize,
938    Deserialize,
939    Encode,
940    Decode,
941    IntoValue,
942)]
943#[serde(transparent)]
944pub struct AccountId {
945    pub value: String,
946}
947
948impl AccountId {
949    pub fn placeholder() -> Self {
950        Self {
951            value: "-1".to_string(),
952        }
953    }
954
955    pub fn generate() -> Self {
956        Self {
957            value: Uuid::new_v4().to_string(),
958        }
959    }
960}
961
962impl From<&str> for AccountId {
963    fn from(value: &str) -> Self {
964        Self {
965            value: value.to_string(),
966        }
967    }
968}
969
970impl Display for AccountId {
971    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
972        write!(f, "{}", &self.value)
973    }
974}
975
976#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
977#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
978#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
979#[serde(rename_all = "camelCase")]
980pub struct WorkerNameFilter {
981    pub comparator: StringFilterComparator,
982    pub value: String,
983}
984
985impl WorkerNameFilter {
986    pub fn new(comparator: StringFilterComparator, value: String) -> Self {
987        Self { comparator, value }
988    }
989}
990
991impl Display for WorkerNameFilter {
992    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
993        write!(f, "name {} {}", self.comparator, self.value)
994    }
995}
996
997#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
998#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
999#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1000#[serde(rename_all = "camelCase")]
1001pub struct WorkerStatusFilter {
1002    pub comparator: FilterComparator,
1003    pub value: WorkerStatus,
1004}
1005
1006impl WorkerStatusFilter {
1007    pub fn new(comparator: FilterComparator, value: WorkerStatus) -> Self {
1008        Self { comparator, value }
1009    }
1010}
1011
1012impl Display for WorkerStatusFilter {
1013    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1014        write!(f, "status == {:?}", self.value)
1015    }
1016}
1017
1018#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1019#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1020#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1021#[serde(rename_all = "camelCase")]
1022pub struct WorkerVersionFilter {
1023    pub comparator: FilterComparator,
1024    pub value: ComponentVersion,
1025}
1026
1027impl WorkerVersionFilter {
1028    pub fn new(comparator: FilterComparator, value: ComponentVersion) -> Self {
1029        Self { comparator, value }
1030    }
1031}
1032
1033impl Display for WorkerVersionFilter {
1034    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1035        write!(f, "version {} {}", self.comparator, self.value)
1036    }
1037}
1038
1039#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1040#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1041#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1042#[serde(rename_all = "camelCase")]
1043pub struct WorkerCreatedAtFilter {
1044    pub comparator: FilterComparator,
1045    pub value: Timestamp,
1046}
1047
1048impl WorkerCreatedAtFilter {
1049    pub fn new(comparator: FilterComparator, value: Timestamp) -> Self {
1050        Self { comparator, value }
1051    }
1052}
1053
1054impl Display for WorkerCreatedAtFilter {
1055    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1056        write!(f, "created_at {} {}", self.comparator, self.value)
1057    }
1058}
1059
1060#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1061#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1062#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1063#[serde(rename_all = "camelCase")]
1064pub struct WorkerEnvFilter {
1065    pub name: String,
1066    pub comparator: StringFilterComparator,
1067    pub value: String,
1068}
1069
1070impl WorkerEnvFilter {
1071    pub fn new(name: String, comparator: StringFilterComparator, value: String) -> Self {
1072        Self {
1073            name,
1074            comparator,
1075            value,
1076        }
1077    }
1078}
1079
1080impl Display for WorkerEnvFilter {
1081    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1082        write!(f, "env.{} {} {}", self.name, self.comparator, self.value)
1083    }
1084}
1085
1086#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1087#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1088#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1089#[serde(rename_all = "camelCase")]
1090pub struct WorkerAndFilter {
1091    pub filters: Vec<WorkerFilter>,
1092}
1093
1094impl WorkerAndFilter {
1095    pub fn new(filters: Vec<WorkerFilter>) -> Self {
1096        Self { filters }
1097    }
1098}
1099
1100impl Display for WorkerAndFilter {
1101    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1102        write!(
1103            f,
1104            "({})",
1105            self.filters
1106                .iter()
1107                .map(|f| f.clone().to_string())
1108                .collect::<Vec<String>>()
1109                .join(" AND ")
1110        )
1111    }
1112}
1113
1114#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1115#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1116#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1117#[serde(rename_all = "camelCase")]
1118pub struct WorkerOrFilter {
1119    pub filters: Vec<WorkerFilter>,
1120}
1121
1122impl WorkerOrFilter {
1123    pub fn new(filters: Vec<WorkerFilter>) -> Self {
1124        Self { filters }
1125    }
1126}
1127
1128impl Display for WorkerOrFilter {
1129    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1130        write!(
1131            f,
1132            "({})",
1133            self.filters
1134                .iter()
1135                .map(|f| f.clone().to_string())
1136                .collect::<Vec<String>>()
1137                .join(" OR ")
1138        )
1139    }
1140}
1141
1142#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1143#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1144#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1145#[serde(rename_all = "camelCase")]
1146pub struct WorkerNotFilter {
1147    filter: Box<WorkerFilter>,
1148}
1149
1150impl WorkerNotFilter {
1151    pub fn new(filter: WorkerFilter) -> Self {
1152        Self {
1153            filter: Box::new(filter),
1154        }
1155    }
1156}
1157
1158impl Display for WorkerNotFilter {
1159    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1160        write!(f, "NOT ({})", self.filter)
1161    }
1162}
1163
1164#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1165#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
1166#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
1167#[serde(tag = "type")]
1168pub enum WorkerFilter {
1169    Name(WorkerNameFilter),
1170    Status(WorkerStatusFilter),
1171    Version(WorkerVersionFilter),
1172    CreatedAt(WorkerCreatedAtFilter),
1173    Env(WorkerEnvFilter),
1174    And(WorkerAndFilter),
1175    Or(WorkerOrFilter),
1176    Not(WorkerNotFilter),
1177}
1178
1179impl WorkerFilter {
1180    pub fn and(&self, filter: WorkerFilter) -> Self {
1181        match self.clone() {
1182            WorkerFilter::And(WorkerAndFilter { filters }) => {
1183                Self::new_and([filters, vec![filter]].concat())
1184            }
1185            f => Self::new_and(vec![f, filter]),
1186        }
1187    }
1188
1189    pub fn or(&self, filter: WorkerFilter) -> Self {
1190        match self.clone() {
1191            WorkerFilter::Or(WorkerOrFilter { filters }) => {
1192                Self::new_or([filters, vec![filter]].concat())
1193            }
1194            f => Self::new_or(vec![f, filter]),
1195        }
1196    }
1197
1198    pub fn not(&self) -> Self {
1199        Self::new_not(self.clone())
1200    }
1201
1202    pub fn matches(&self, metadata: &WorkerMetadata) -> bool {
1203        match self.clone() {
1204            WorkerFilter::Name(WorkerNameFilter { comparator, value }) => {
1205                comparator.matches(&metadata.worker_id.worker_name, &value)
1206            }
1207            WorkerFilter::Version(WorkerVersionFilter { comparator, value }) => {
1208                let version: ComponentVersion = metadata.last_known_status.component_version;
1209                comparator.matches(&version, &value)
1210            }
1211            WorkerFilter::Env(WorkerEnvFilter {
1212                name,
1213                comparator,
1214                value,
1215            }) => {
1216                let mut result = false;
1217                let name = name.to_lowercase();
1218                for env_value in metadata.env.clone() {
1219                    if env_value.0.to_lowercase() == name {
1220                        result = comparator.matches(&env_value.1, &value);
1221
1222                        break;
1223                    }
1224                }
1225                result
1226            }
1227            WorkerFilter::CreatedAt(WorkerCreatedAtFilter { comparator, value }) => {
1228                comparator.matches(&metadata.created_at, &value)
1229            }
1230            WorkerFilter::Status(WorkerStatusFilter { comparator, value }) => {
1231                comparator.matches(&metadata.last_known_status.status, &value)
1232            }
1233            WorkerFilter::Not(WorkerNotFilter { filter }) => !filter.matches(metadata),
1234            WorkerFilter::And(WorkerAndFilter { filters }) => {
1235                let mut result = true;
1236                for filter in filters {
1237                    if !filter.matches(metadata) {
1238                        result = false;
1239                        break;
1240                    }
1241                }
1242                result
1243            }
1244            WorkerFilter::Or(WorkerOrFilter { filters }) => {
1245                let mut result = true;
1246                if !filters.is_empty() {
1247                    result = false;
1248                    for filter in filters {
1249                        if filter.matches(metadata) {
1250                            result = true;
1251                            break;
1252                        }
1253                    }
1254                }
1255                result
1256            }
1257        }
1258    }
1259
1260    pub fn new_and(filters: Vec<WorkerFilter>) -> Self {
1261        WorkerFilter::And(WorkerAndFilter::new(filters))
1262    }
1263
1264    pub fn new_or(filters: Vec<WorkerFilter>) -> Self {
1265        WorkerFilter::Or(WorkerOrFilter::new(filters))
1266    }
1267
1268    pub fn new_not(filter: WorkerFilter) -> Self {
1269        WorkerFilter::Not(WorkerNotFilter::new(filter))
1270    }
1271
1272    pub fn new_name(comparator: StringFilterComparator, value: String) -> Self {
1273        WorkerFilter::Name(WorkerNameFilter::new(comparator, value))
1274    }
1275
1276    pub fn new_env(name: String, comparator: StringFilterComparator, value: String) -> Self {
1277        WorkerFilter::Env(WorkerEnvFilter::new(name, comparator, value))
1278    }
1279
1280    pub fn new_version(comparator: FilterComparator, value: ComponentVersion) -> Self {
1281        WorkerFilter::Version(WorkerVersionFilter::new(comparator, value))
1282    }
1283
1284    pub fn new_status(comparator: FilterComparator, value: WorkerStatus) -> Self {
1285        WorkerFilter::Status(WorkerStatusFilter::new(comparator, value))
1286    }
1287
1288    pub fn new_created_at(comparator: FilterComparator, value: Timestamp) -> Self {
1289        WorkerFilter::CreatedAt(WorkerCreatedAtFilter::new(comparator, value))
1290    }
1291
1292    pub fn from(filters: Vec<String>) -> Result<WorkerFilter, String> {
1293        let mut fs = Vec::new();
1294        for f in filters {
1295            fs.push(WorkerFilter::from_str(&f)?);
1296        }
1297        Ok(WorkerFilter::new_and(fs))
1298    }
1299}
1300
1301impl Display for WorkerFilter {
1302    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1303        match self {
1304            WorkerFilter::Name(filter) => {
1305                write!(f, "{}", filter)
1306            }
1307            WorkerFilter::Version(filter) => {
1308                write!(f, "{}", filter)
1309            }
1310            WorkerFilter::Status(filter) => {
1311                write!(f, "{}", filter)
1312            }
1313            WorkerFilter::CreatedAt(filter) => {
1314                write!(f, "{}", filter)
1315            }
1316            WorkerFilter::Env(filter) => {
1317                write!(f, "{}", filter)
1318            }
1319            WorkerFilter::Not(filter) => {
1320                write!(f, "{}", filter)
1321            }
1322            WorkerFilter::And(filter) => {
1323                write!(f, "{}", filter)
1324            }
1325            WorkerFilter::Or(filter) => {
1326                write!(f, "{}", filter)
1327            }
1328        }
1329    }
1330}
1331
1332impl FromStr for WorkerFilter {
1333    type Err = String;
1334
1335    fn from_str(s: &str) -> Result<Self, Self::Err> {
1336        let elements = s.split_whitespace().collect::<Vec<&str>>();
1337
1338        if elements.len() == 3 {
1339            let arg = elements[0];
1340            let comparator = elements[1];
1341            let value = elements[2];
1342            match arg {
1343                "name" => Ok(WorkerFilter::new_name(
1344                    comparator.parse()?,
1345                    value.to_string(),
1346                )),
1347                "version" => Ok(WorkerFilter::new_version(
1348                    comparator.parse()?,
1349                    value
1350                        .parse()
1351                        .map_err(|e| format!("Invalid filter value: {}", e))?,
1352                )),
1353                "status" => Ok(WorkerFilter::new_status(
1354                    comparator.parse()?,
1355                    value.parse()?,
1356                )),
1357                "created_at" | "createdAt" => Ok(WorkerFilter::new_created_at(
1358                    comparator.parse()?,
1359                    value.parse()?,
1360                )),
1361                _ if arg.starts_with("env.") => {
1362                    let name = &arg[4..];
1363                    Ok(WorkerFilter::new_env(
1364                        name.to_string(),
1365                        comparator.parse()?,
1366                        value.to_string(),
1367                    ))
1368                }
1369                _ => Err(format!("Invalid filter: {}", s)),
1370            }
1371        } else {
1372            Err(format!("Invalid filter: {}", s))
1373        }
1374    }
1375}
1376
1377#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1378#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1379pub enum StringFilterComparator {
1380    Equal,
1381    NotEqual,
1382    Like,
1383    NotLike,
1384}
1385
1386impl StringFilterComparator {
1387    pub fn matches<T: Display>(&self, value1: &T, value2: &T) -> bool {
1388        match self {
1389            StringFilterComparator::Equal => value1.to_string() == value2.to_string(),
1390            StringFilterComparator::NotEqual => value1.to_string() != value2.to_string(),
1391            StringFilterComparator::Like => {
1392                value1.to_string().contains(value2.to_string().as_str())
1393            }
1394            StringFilterComparator::NotLike => {
1395                !value1.to_string().contains(value2.to_string().as_str())
1396            }
1397        }
1398    }
1399}
1400
1401impl FromStr for StringFilterComparator {
1402    type Err = String;
1403
1404    fn from_str(s: &str) -> Result<Self, Self::Err> {
1405        match s.to_lowercase().as_str() {
1406            "==" | "=" | "equal" | "eq" => Ok(StringFilterComparator::Equal),
1407            "!=" | "notequal" | "ne" => Ok(StringFilterComparator::NotEqual),
1408            "like" => Ok(StringFilterComparator::Like),
1409            "notlike" => Ok(StringFilterComparator::NotLike),
1410            _ => Err(format!("Unknown String Filter Comparator: {}", s)),
1411        }
1412    }
1413}
1414
1415impl TryFrom<i32> for StringFilterComparator {
1416    type Error = String;
1417
1418    fn try_from(value: i32) -> Result<Self, Self::Error> {
1419        match value {
1420            0 => Ok(StringFilterComparator::Equal),
1421            1 => Ok(StringFilterComparator::NotEqual),
1422            2 => Ok(StringFilterComparator::Like),
1423            3 => Ok(StringFilterComparator::NotLike),
1424            _ => Err(format!("Unknown String Filter Comparator: {}", value)),
1425        }
1426    }
1427}
1428
1429impl From<StringFilterComparator> for i32 {
1430    fn from(value: StringFilterComparator) -> Self {
1431        match value {
1432            StringFilterComparator::Equal => 0,
1433            StringFilterComparator::NotEqual => 1,
1434            StringFilterComparator::Like => 2,
1435            StringFilterComparator::NotLike => 3,
1436        }
1437    }
1438}
1439
1440impl Display for StringFilterComparator {
1441    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1442        let s = match self {
1443            StringFilterComparator::Equal => "==",
1444            StringFilterComparator::NotEqual => "!=",
1445            StringFilterComparator::Like => "like",
1446            StringFilterComparator::NotLike => "notlike",
1447        };
1448        write!(f, "{}", s)
1449    }
1450}
1451
1452#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1453#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1454pub enum FilterComparator {
1455    Equal,
1456    NotEqual,
1457    GreaterEqual,
1458    Greater,
1459    LessEqual,
1460    Less,
1461}
1462
1463impl Display for FilterComparator {
1464    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1465        let s = match self {
1466            FilterComparator::Equal => "==",
1467            FilterComparator::NotEqual => "!=",
1468            FilterComparator::GreaterEqual => ">=",
1469            FilterComparator::Greater => ">",
1470            FilterComparator::LessEqual => "<=",
1471            FilterComparator::Less => "<",
1472        };
1473        write!(f, "{}", s)
1474    }
1475}
1476
1477impl FilterComparator {
1478    pub fn matches<T: Ord>(&self, value1: &T, value2: &T) -> bool {
1479        match self {
1480            FilterComparator::Equal => value1 == value2,
1481            FilterComparator::NotEqual => value1 != value2,
1482            FilterComparator::Less => value1 < value2,
1483            FilterComparator::LessEqual => value1 <= value2,
1484            FilterComparator::Greater => value1 > value2,
1485            FilterComparator::GreaterEqual => value1 >= value2,
1486        }
1487    }
1488}
1489
1490impl FromStr for FilterComparator {
1491    type Err = String;
1492    fn from_str(s: &str) -> Result<Self, Self::Err> {
1493        match s.to_lowercase().as_str() {
1494            "==" | "=" | "equal" | "eq" => Ok(FilterComparator::Equal),
1495            "!=" | "notequal" | "ne" => Ok(FilterComparator::NotEqual),
1496            ">=" | "greaterequal" | "ge" => Ok(FilterComparator::GreaterEqual),
1497            ">" | "greater" | "gt" => Ok(FilterComparator::Greater),
1498            "<=" | "lessequal" | "le" => Ok(FilterComparator::LessEqual),
1499            "<" | "less" | "lt" => Ok(FilterComparator::Less),
1500            _ => Err(format!("Unknown Filter Comparator: {}", s)),
1501        }
1502    }
1503}
1504
1505impl TryFrom<i32> for FilterComparator {
1506    type Error = String;
1507
1508    fn try_from(value: i32) -> Result<Self, Self::Error> {
1509        match value {
1510            0 => Ok(FilterComparator::Equal),
1511            1 => Ok(FilterComparator::NotEqual),
1512            2 => Ok(FilterComparator::Less),
1513            3 => Ok(FilterComparator::LessEqual),
1514            4 => Ok(FilterComparator::Greater),
1515            5 => Ok(FilterComparator::GreaterEqual),
1516            _ => Err(format!("Unknown Filter Comparator: {}", value)),
1517        }
1518    }
1519}
1520
1521impl From<FilterComparator> for i32 {
1522    fn from(value: FilterComparator) -> Self {
1523        match value {
1524            FilterComparator::Equal => 0,
1525            FilterComparator::NotEqual => 1,
1526            FilterComparator::Less => 2,
1527            FilterComparator::LessEqual => 3,
1528            FilterComparator::Greater => 4,
1529            FilterComparator::GreaterEqual => 5,
1530        }
1531    }
1532}
1533
1534#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, Default)]
1535#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1536#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1537#[serde(rename_all = "camelCase")]
1538pub struct ScanCursor {
1539    pub cursor: u64,
1540    pub layer: usize,
1541}
1542
1543impl ScanCursor {
1544    pub fn is_active_layer_finished(&self) -> bool {
1545        self.cursor == 0
1546    }
1547
1548    pub fn is_finished(&self) -> bool {
1549        self.cursor == 0 && self.layer == 0
1550    }
1551
1552    pub fn into_option(self) -> Option<Self> {
1553        if self.is_finished() {
1554            None
1555        } else {
1556            Some(self)
1557        }
1558    }
1559}
1560
1561impl Display for ScanCursor {
1562    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1563        write!(f, "{}/{}", self.layer, self.cursor)
1564    }
1565}
1566
1567impl FromStr for ScanCursor {
1568    type Err = String;
1569
1570    fn from_str(s: &str) -> Result<Self, Self::Err> {
1571        let parts = s.split('/').collect::<Vec<&str>>();
1572        if parts.len() == 2 {
1573            Ok(ScanCursor {
1574                layer: parts[0]
1575                    .parse()
1576                    .map_err(|e| format!("Invalid layer part: {}", e))?,
1577                cursor: parts[1]
1578                    .parse()
1579                    .map_err(|e| format!("Invalid cursor part: {}", e))?,
1580            })
1581        } else {
1582            Err("Invalid cursor, must have 'layer/cursor' format".to_string())
1583        }
1584    }
1585}
1586
1587#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
1588pub enum LogLevel {
1589    Trace,
1590    Debug,
1591    Info,
1592    Warn,
1593    Error,
1594    Critical,
1595}
1596
1597#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1598pub enum WorkerEvent {
1599    StdOut {
1600        timestamp: Timestamp,
1601        bytes: Vec<u8>,
1602    },
1603    StdErr {
1604        timestamp: Timestamp,
1605        bytes: Vec<u8>,
1606    },
1607    Log {
1608        timestamp: Timestamp,
1609        level: LogLevel,
1610        context: String,
1611        message: String,
1612    },
1613    InvocationStart {
1614        timestamp: Timestamp,
1615        function: String,
1616        idempotency_key: IdempotencyKey,
1617    },
1618    InvocationFinished {
1619        timestamp: Timestamp,
1620        function: String,
1621        idempotency_key: IdempotencyKey,
1622    },
1623    Close,
1624}
1625
1626impl WorkerEvent {
1627    pub fn stdout(bytes: Vec<u8>) -> WorkerEvent {
1628        WorkerEvent::StdOut {
1629            timestamp: Timestamp::now_utc(),
1630            bytes,
1631        }
1632    }
1633
1634    pub fn stderr(bytes: Vec<u8>) -> WorkerEvent {
1635        WorkerEvent::StdErr {
1636            timestamp: Timestamp::now_utc(),
1637            bytes,
1638        }
1639    }
1640
1641    pub fn log(level: LogLevel, context: &str, message: &str) -> WorkerEvent {
1642        WorkerEvent::Log {
1643            timestamp: Timestamp::now_utc(),
1644            level,
1645            context: context.to_string(),
1646            message: message.to_string(),
1647        }
1648    }
1649
1650    pub fn invocation_start(function: &str, idempotency_key: &IdempotencyKey) -> WorkerEvent {
1651        WorkerEvent::InvocationStart {
1652            timestamp: Timestamp::now_utc(),
1653            function: function.to_string(),
1654            idempotency_key: idempotency_key.clone(),
1655        }
1656    }
1657
1658    pub fn invocation_finished(function: &str, idempotency_key: &IdempotencyKey) -> WorkerEvent {
1659        WorkerEvent::InvocationFinished {
1660            timestamp: Timestamp::now_utc(),
1661            function: function.to_string(),
1662            idempotency_key: idempotency_key.clone(),
1663        }
1664    }
1665
1666    pub fn as_oplog_entry(&self) -> Option<OplogEntry> {
1667        match self {
1668            WorkerEvent::StdOut { timestamp, bytes } => Some(OplogEntry::Log {
1669                timestamp: *timestamp,
1670                level: oplog::LogLevel::Stdout,
1671                context: String::new(),
1672                message: String::from_utf8_lossy(bytes).to_string(),
1673            }),
1674            WorkerEvent::StdErr { timestamp, bytes } => Some(OplogEntry::Log {
1675                timestamp: *timestamp,
1676                level: oplog::LogLevel::Stderr,
1677                context: String::new(),
1678                message: String::from_utf8_lossy(bytes).to_string(),
1679            }),
1680            WorkerEvent::Log {
1681                timestamp,
1682                level,
1683                context,
1684                message,
1685            } => Some(OplogEntry::Log {
1686                timestamp: *timestamp,
1687                level: match level {
1688                    LogLevel::Trace => oplog::LogLevel::Trace,
1689                    LogLevel::Debug => oplog::LogLevel::Debug,
1690                    LogLevel::Info => oplog::LogLevel::Info,
1691                    LogLevel::Warn => oplog::LogLevel::Warn,
1692                    LogLevel::Error => oplog::LogLevel::Error,
1693                    LogLevel::Critical => oplog::LogLevel::Critical,
1694                },
1695                context: context.clone(),
1696                message: message.clone(),
1697            }),
1698            WorkerEvent::InvocationStart { .. } => None,
1699            WorkerEvent::InvocationFinished { .. } => None,
1700            WorkerEvent::Close => None,
1701        }
1702    }
1703}
1704
1705impl Display for WorkerEvent {
1706    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1707        match self {
1708            WorkerEvent::StdOut { bytes, .. } => {
1709                write!(
1710                    f,
1711                    "<stdout> {}",
1712                    String::from_utf8(bytes.clone()).unwrap_or_default()
1713                )
1714            }
1715            WorkerEvent::StdErr { bytes, .. } => {
1716                write!(
1717                    f,
1718                    "<stderr> {}",
1719                    String::from_utf8(bytes.clone()).unwrap_or_default()
1720                )
1721            }
1722            WorkerEvent::Log {
1723                level,
1724                context,
1725                message,
1726                ..
1727            } => {
1728                write!(f, "<log> {:?} {} {}", level, context, message)
1729            }
1730            WorkerEvent::InvocationStart {
1731                function,
1732                idempotency_key,
1733                ..
1734            } => {
1735                write!(f, "<invocation-start> {} {}", function, idempotency_key)
1736            }
1737            WorkerEvent::InvocationFinished {
1738                function,
1739                idempotency_key,
1740                ..
1741            } => {
1742                write!(f, "<invocation-finished> {} {}", function, idempotency_key)
1743            }
1744            WorkerEvent::Close => {
1745                write!(f, "<close>")
1746            }
1747        }
1748    }
1749}
1750
1751#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
1752#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1753#[repr(i32)]
1754pub enum ComponentType {
1755    Durable = 0,
1756    Ephemeral = 1,
1757}
1758
1759impl TryFrom<i32> for ComponentType {
1760    type Error = String;
1761
1762    fn try_from(value: i32) -> Result<Self, Self::Error> {
1763        match value {
1764            0 => Ok(ComponentType::Durable),
1765            1 => Ok(ComponentType::Ephemeral),
1766            _ => Err(format!("Unknown Component Type: {}", value)),
1767        }
1768    }
1769}
1770
1771impl Display for ComponentType {
1772    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1773        let s = match self {
1774            ComponentType::Durable => "Durable",
1775            ComponentType::Ephemeral => "Ephemeral",
1776        };
1777        write!(f, "{}", s)
1778    }
1779}
1780
1781impl FromStr for ComponentType {
1782    type Err = String;
1783
1784    fn from_str(s: &str) -> Result<Self, Self::Err> {
1785        match s {
1786            "Durable" => Ok(ComponentType::Durable),
1787            "Ephemeral" => Ok(ComponentType::Ephemeral),
1788            _ => Err(format!("Unknown Component Type: {}", s)),
1789        }
1790    }
1791}
1792
1793#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1794#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1795#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1796#[serde(rename_all = "camelCase")]
1797pub struct Empty {}
1798
1799/// Key that can be used to identify a component file.
1800/// All files with the same content will have the same key.
1801#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1802#[cfg_attr(feature = "poem", derive(poem_openapi::NewType))]
1803pub struct InitialComponentFileKey(pub String);
1804
1805impl Display for InitialComponentFileKey {
1806    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1807        self.0.fmt(f)
1808    }
1809}
1810
1811/// Path inside a component filesystem. Must be
1812/// - absolute (start with '/')
1813/// - not contain ".." components
1814/// - not contain "." components
1815/// - use '/' as a separator
1816#[derive(Clone, Debug, Eq, PartialEq, Hash)]
1817pub struct ComponentFilePath(Utf8UnixPathBuf);
1818
1819impl ComponentFilePath {
1820    pub fn from_abs_str(s: &str) -> Result<Self, String> {
1821        let buf: Utf8UnixPathBuf = s.into();
1822        if !buf.is_absolute() {
1823            return Err("Path must be absolute".to_string());
1824        }
1825
1826        Ok(ComponentFilePath(buf.normalize()))
1827    }
1828
1829    pub fn from_rel_str(s: &str) -> Result<Self, String> {
1830        Self::from_abs_str(&format!("/{}", s))
1831    }
1832
1833    pub fn from_either_str(s: &str) -> Result<Self, String> {
1834        if s.starts_with('/') {
1835            Self::from_abs_str(s)
1836        } else {
1837            Self::from_rel_str(s)
1838        }
1839    }
1840
1841    pub fn as_path(&self) -> &Utf8UnixPathBuf {
1842        &self.0
1843    }
1844
1845    pub fn to_rel_string(&self) -> String {
1846        self.0.strip_prefix("/").unwrap().to_string()
1847    }
1848
1849    pub fn extend(&mut self, path: &str) -> Result<(), String> {
1850        self.0.push_checked(path).map_err(|e| e.to_string())
1851    }
1852}
1853
1854impl Display for ComponentFilePath {
1855    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1856        self.0.fmt(f)
1857    }
1858}
1859
1860impl Serialize for ComponentFilePath {
1861    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1862    where
1863        S: Serializer,
1864    {
1865        String::serialize(&self.to_string(), serializer)
1866    }
1867}
1868
1869impl<'de> Deserialize<'de> for ComponentFilePath {
1870    fn deserialize<D>(deserializer: D) -> Result<ComponentFilePath, D::Error>
1871    where
1872        D: Deserializer<'de>,
1873    {
1874        let str = String::deserialize(deserializer)?;
1875        Self::from_abs_str(&str).map_err(de::Error::custom)
1876    }
1877}
1878
1879impl TryFrom<&str> for ComponentFilePath {
1880    type Error = String;
1881    fn try_from(value: &str) -> Result<Self, Self::Error> {
1882        Self::from_either_str(value)
1883    }
1884}
1885
1886#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
1887#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1888#[serde(rename_all = "kebab-case")]
1889#[cfg_attr(feature = "poem", oai(rename_all = "kebab-case"))]
1890pub enum ComponentFilePermissions {
1891    ReadOnly,
1892    ReadWrite,
1893}
1894
1895impl ComponentFilePermissions {
1896    pub fn as_compact_str(&self) -> &'static str {
1897        match self {
1898            ComponentFilePermissions::ReadOnly => "ro",
1899            ComponentFilePermissions::ReadWrite => "rw",
1900        }
1901    }
1902    pub fn from_compact_str(s: &str) -> Result<Self, String> {
1903        match s {
1904            "ro" => Ok(ComponentFilePermissions::ReadOnly),
1905            "rw" => Ok(ComponentFilePermissions::ReadWrite),
1906            _ => Err(format!("Unknown permissions: {}", s)),
1907        }
1908    }
1909}
1910
1911#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1912#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1913#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1914#[serde(rename_all = "camelCase")]
1915pub struct InitialComponentFile {
1916    pub key: InitialComponentFileKey,
1917    pub path: ComponentFilePath,
1918    pub permissions: ComponentFilePermissions,
1919}
1920
1921impl InitialComponentFile {
1922    pub fn is_read_only(&self) -> bool {
1923        self.permissions == ComponentFilePermissions::ReadOnly
1924    }
1925}
1926
1927#[derive(Clone, Debug, Serialize, Deserialize)]
1928#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1929#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1930#[serde(rename_all = "camelCase")]
1931pub struct ComponentFilePathWithPermissions {
1932    pub path: ComponentFilePath,
1933    pub permissions: ComponentFilePermissions,
1934}
1935
1936impl ComponentFilePathWithPermissions {
1937    pub fn extend_path(&mut self, path: &str) -> Result<(), String> {
1938        self.path.extend(path)
1939    }
1940}
1941
1942impl Display for ComponentFilePathWithPermissions {
1943    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1944        write!(f, "{}", serde_json::to_string(self).unwrap())
1945    }
1946}
1947
1948#[derive(Clone, Debug, Serialize, Deserialize)]
1949#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1950#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1951#[serde(rename_all = "camelCase")]
1952pub struct ComponentFilePathWithPermissionsList {
1953    pub values: Vec<ComponentFilePathWithPermissions>,
1954}
1955
1956impl Display for ComponentFilePathWithPermissionsList {
1957    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1958        write!(f, "{}", serde_json::to_string(self).unwrap())
1959    }
1960}
1961
1962#[derive(Clone, Debug, PartialEq)]
1963pub enum ComponentFileSystemNodeDetails {
1964    File {
1965        permissions: ComponentFilePermissions,
1966        size: u64,
1967    },
1968    Directory,
1969}
1970
1971#[derive(Clone, Debug, PartialEq)]
1972pub struct ComponentFileSystemNode {
1973    pub name: String,
1974    pub last_modified: SystemTime,
1975    pub details: ComponentFileSystemNodeDetails,
1976}
1977
1978#[derive(Debug, Clone, PartialEq, Serialize, Encode, Decode, Default)]
1979#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1980#[serde(rename_all = "kebab-case")]
1981#[cfg_attr(feature = "poem", oai(rename_all = "kebab-case"))]
1982pub enum GatewayBindingType {
1983    #[default]
1984    Default,
1985    FileServer,
1986    HttpHandler,
1987    CorsPreflight,
1988}
1989
1990// To keep backward compatibility as we documented wit-worker to be default
1991impl<'de> Deserialize<'de> for GatewayBindingType {
1992    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1993    where
1994        D: Deserializer<'de>,
1995    {
1996        struct GatewayBindingTypeVisitor;
1997
1998        impl de::Visitor<'_> for GatewayBindingTypeVisitor {
1999            type Value = GatewayBindingType;
2000
2001            fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
2002                formatter.write_str("a string representing the binding type")
2003            }
2004
2005            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
2006            where
2007                E: de::Error,
2008            {
2009                match value {
2010                    "default" | "wit-worker" => Ok(GatewayBindingType::Default),
2011                    "file-server" => Ok(GatewayBindingType::FileServer),
2012                    "cors-preflight" => Ok(GatewayBindingType::CorsPreflight),
2013                    _ => Err(de::Error::invalid_value(Unexpected::Str(value), &self)),
2014                }
2015            }
2016        }
2017
2018        deserializer.deserialize_str(GatewayBindingTypeVisitor)
2019    }
2020}
2021
2022impl TryFrom<String> for GatewayBindingType {
2023    type Error = String;
2024
2025    fn try_from(value: String) -> Result<Self, Self::Error> {
2026        match value.as_str() {
2027            "default" => Ok(GatewayBindingType::Default),
2028            "file-server" => Ok(GatewayBindingType::FileServer),
2029            _ => Err(format!("Invalid WorkerBindingType: {}", value)),
2030        }
2031    }
2032}
2033
2034impl From<crate::model::WorkerId> for golem_wasm_rpc::WorkerId {
2035    fn from(worker_id: crate::model::WorkerId) -> Self {
2036        golem_wasm_rpc::WorkerId {
2037            component_id: worker_id.component_id.into(),
2038            worker_name: worker_id.worker_name,
2039        }
2040    }
2041}
2042
2043impl From<golem_wasm_rpc::WorkerId> for crate::model::WorkerId {
2044    fn from(host: golem_wasm_rpc::WorkerId) -> Self {
2045        Self {
2046            component_id: host.component_id.into(),
2047            worker_name: host.worker_name,
2048        }
2049    }
2050}
2051
2052impl From<golem_wasm_rpc::ComponentId> for crate::model::ComponentId {
2053    fn from(host: golem_wasm_rpc::ComponentId) -> Self {
2054        let high_bits = host.uuid.high_bits;
2055        let low_bits = host.uuid.low_bits;
2056
2057        Self(uuid::Uuid::from_u64_pair(high_bits, low_bits))
2058    }
2059}
2060
2061impl From<crate::model::ComponentId> for golem_wasm_rpc::ComponentId {
2062    fn from(component_id: crate::model::ComponentId) -> Self {
2063        let (high_bits, low_bits) = component_id.0.as_u64_pair();
2064
2065        golem_wasm_rpc::ComponentId {
2066            uuid: golem_wasm_rpc::Uuid {
2067                high_bits,
2068                low_bits,
2069            },
2070        }
2071    }
2072}
2073
2074#[cfg(test)]
2075mod tests {
2076    use std::collections::HashSet;
2077    use std::str::FromStr;
2078    use std::time::SystemTime;
2079    use std::vec;
2080    use test_r::test;
2081    use tracing::info;
2082
2083    use crate::model::oplog::OplogIndex;
2084
2085    use crate::model::{
2086        AccountId, ComponentFilePath, ComponentId, FilterComparator, IdempotencyKey, ShardId,
2087        StringFilterComparator, TargetWorkerId, Timestamp, WorkerFilter, WorkerId, WorkerMetadata,
2088        WorkerStatus, WorkerStatusRecord,
2089    };
2090    use bincode::{Decode, Encode};
2091
2092    use rand::{rng, Rng};
2093    use serde::{Deserialize, Serialize};
2094
2095    #[test]
2096    fn timestamp_conversion() {
2097        let ts: Timestamp = Timestamp::now_utc();
2098
2099        let prost_ts: prost_types::Timestamp = ts.into();
2100
2101        let ts2: Timestamp = prost_ts.into();
2102
2103        assert_eq!(ts2, ts);
2104    }
2105
2106    #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
2107    struct ExampleWithAccountId {
2108        account_id: AccountId,
2109    }
2110
2111    #[test]
2112    fn account_id_from_json_apigateway_version() {
2113        let json = "{ \"account_id\": \"account-1\" }";
2114        let example: ExampleWithAccountId = serde_json::from_str(json).unwrap();
2115        assert_eq!(
2116            example.account_id,
2117            AccountId {
2118                value: "account-1".to_string()
2119            }
2120        );
2121    }
2122
2123    #[test]
2124    fn account_id_json_serialization() {
2125        // We want to use this variant for serialization because it is used on the public API gateway API
2126        let example: ExampleWithAccountId = ExampleWithAccountId {
2127            account_id: AccountId {
2128                value: "account-1".to_string(),
2129            },
2130        };
2131        let json = serde_json::to_string(&example).unwrap();
2132        assert_eq!(json, "{\"account_id\":\"account-1\"}");
2133    }
2134
2135    #[test]
2136    fn worker_filter_parse() {
2137        assert_eq!(
2138            WorkerFilter::from_str(" name =  worker-1").unwrap(),
2139            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2140        );
2141
2142        assert_eq!(
2143            WorkerFilter::from_str("status == Running").unwrap(),
2144            WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2145        );
2146
2147        assert_eq!(
2148            WorkerFilter::from_str("version >= 10").unwrap(),
2149            WorkerFilter::new_version(FilterComparator::GreaterEqual, 10)
2150        );
2151
2152        assert_eq!(
2153            WorkerFilter::from_str("env.tag1 == abc ").unwrap(),
2154            WorkerFilter::new_env(
2155                "tag1".to_string(),
2156                StringFilterComparator::Equal,
2157                "abc".to_string(),
2158            )
2159        );
2160    }
2161
2162    #[test]
2163    fn worker_filter_combination() {
2164        assert_eq!(
2165            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).not(),
2166            WorkerFilter::new_not(WorkerFilter::new_name(
2167                StringFilterComparator::Equal,
2168                "worker-1".to_string(),
2169            ))
2170        );
2171
2172        assert_eq!(
2173            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).and(
2174                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2175            ),
2176            WorkerFilter::new_and(vec![
2177                WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2178                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2179            ])
2180        );
2181
2182        assert_eq!(
2183            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2184                .and(WorkerFilter::new_status(
2185                    FilterComparator::Equal,
2186                    WorkerStatus::Running,
2187                ))
2188                .and(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2189            WorkerFilter::new_and(vec![
2190                WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2191                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2192                WorkerFilter::new_version(FilterComparator::Equal, 1),
2193            ])
2194        );
2195
2196        assert_eq!(
2197            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).or(
2198                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2199            ),
2200            WorkerFilter::new_or(vec![
2201                WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2202                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2203            ])
2204        );
2205
2206        assert_eq!(
2207            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2208                .or(WorkerFilter::new_status(
2209                    FilterComparator::NotEqual,
2210                    WorkerStatus::Running,
2211                ))
2212                .or(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2213            WorkerFilter::new_or(vec![
2214                WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2215                WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2216                WorkerFilter::new_version(FilterComparator::Equal, 1),
2217            ])
2218        );
2219
2220        assert_eq!(
2221            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2222                .and(WorkerFilter::new_status(
2223                    FilterComparator::NotEqual,
2224                    WorkerStatus::Running,
2225                ))
2226                .or(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2227            WorkerFilter::new_or(vec![
2228                WorkerFilter::new_and(vec![
2229                    WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2230                    WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2231                ]),
2232                WorkerFilter::new_version(FilterComparator::Equal, 1),
2233            ])
2234        );
2235
2236        assert_eq!(
2237            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2238                .or(WorkerFilter::new_status(
2239                    FilterComparator::NotEqual,
2240                    WorkerStatus::Running,
2241                ))
2242                .and(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2243            WorkerFilter::new_and(vec![
2244                WorkerFilter::new_or(vec![
2245                    WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2246                    WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2247                ]),
2248                WorkerFilter::new_version(FilterComparator::Equal, 1),
2249            ])
2250        );
2251    }
2252
2253    #[test]
2254    fn worker_filter_matches() {
2255        let component_id = ComponentId::new_v4();
2256        let worker_metadata = WorkerMetadata {
2257            worker_id: WorkerId {
2258                worker_name: "worker-1".to_string(),
2259                component_id,
2260            },
2261            args: vec![],
2262            env: vec![
2263                ("env1".to_string(), "value1".to_string()),
2264                ("env2".to_string(), "value2".to_string()),
2265            ],
2266            account_id: AccountId {
2267                value: "account-1".to_string(),
2268            },
2269            created_at: Timestamp::now_utc(),
2270            parent: None,
2271            last_known_status: WorkerStatusRecord {
2272                component_version: 1,
2273                ..WorkerStatusRecord::default()
2274            },
2275        };
2276
2277        assert!(
2278            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2279                .and(WorkerFilter::new_status(
2280                    FilterComparator::Equal,
2281                    WorkerStatus::Idle,
2282                ))
2283                .matches(&worker_metadata)
2284        );
2285
2286        assert!(WorkerFilter::new_env(
2287            "env1".to_string(),
2288            StringFilterComparator::Equal,
2289            "value1".to_string(),
2290        )
2291        .and(WorkerFilter::new_status(
2292            FilterComparator::Equal,
2293            WorkerStatus::Idle,
2294        ))
2295        .matches(&worker_metadata));
2296
2297        assert!(WorkerFilter::new_env(
2298            "env1".to_string(),
2299            StringFilterComparator::Equal,
2300            "value2".to_string(),
2301        )
2302        .not()
2303        .and(
2304            WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running).or(
2305                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Idle)
2306            )
2307        )
2308        .matches(&worker_metadata));
2309
2310        assert!(
2311            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2312                .and(WorkerFilter::new_version(FilterComparator::Equal, 1))
2313                .matches(&worker_metadata)
2314        );
2315
2316        assert!(
2317            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-2".to_string())
2318                .or(WorkerFilter::new_version(FilterComparator::Equal, 1))
2319                .matches(&worker_metadata)
2320        );
2321
2322        assert!(WorkerFilter::new_version(FilterComparator::GreaterEqual, 1)
2323            .and(WorkerFilter::new_version(FilterComparator::Less, 2))
2324            .or(WorkerFilter::new_name(
2325                StringFilterComparator::Equal,
2326                "worker-2".to_string(),
2327            ))
2328            .matches(&worker_metadata));
2329    }
2330
2331    #[test]
2332    fn target_worker_id_force_shards() {
2333        let mut rng = rng();
2334        const SHARD_COUNT: usize = 1000;
2335        const EXAMPLE_COUNT: usize = 1000;
2336        for _ in 0..EXAMPLE_COUNT {
2337            let mut shard_ids = HashSet::new();
2338            let count = rng.random_range(0..100);
2339            for _ in 0..count {
2340                let shard_id = rng.random_range(0..SHARD_COUNT);
2341                shard_ids.insert(ShardId {
2342                    value: shard_id as i64,
2343                });
2344            }
2345
2346            let component_id = ComponentId::new_v4();
2347            let target_worker_id = TargetWorkerId {
2348                component_id,
2349                worker_name: None,
2350            };
2351
2352            let start = SystemTime::now();
2353            let worker_id = target_worker_id.into_worker_id(&shard_ids, SHARD_COUNT);
2354            let end = SystemTime::now();
2355            info!(
2356                "Time with {count} valid shards: {:?}",
2357                end.duration_since(start).unwrap()
2358            );
2359
2360            if !shard_ids.is_empty() {
2361                assert!(shard_ids.contains(&ShardId::from_worker_id(&worker_id, SHARD_COUNT)));
2362            }
2363        }
2364    }
2365
2366    #[test]
2367    fn derived_idempotency_key() {
2368        let base1 = IdempotencyKey::fresh();
2369        let base2 = IdempotencyKey::fresh();
2370        let base3 = IdempotencyKey {
2371            value: "base3".to_string(),
2372        };
2373
2374        assert_ne!(base1, base2);
2375
2376        let idx1 = OplogIndex::from_u64(2);
2377        let idx2 = OplogIndex::from_u64(11);
2378
2379        let derived11a = IdempotencyKey::derived(&base1, idx1);
2380        let derived12a = IdempotencyKey::derived(&base1, idx2);
2381        let derived21a = IdempotencyKey::derived(&base2, idx1);
2382        let derived22a = IdempotencyKey::derived(&base2, idx2);
2383
2384        let derived11b = IdempotencyKey::derived(&base1, idx1);
2385        let derived12b = IdempotencyKey::derived(&base1, idx2);
2386        let derived21b = IdempotencyKey::derived(&base2, idx1);
2387        let derived22b = IdempotencyKey::derived(&base2, idx2);
2388
2389        let derived31 = IdempotencyKey::derived(&base3, idx1);
2390        let derived32 = IdempotencyKey::derived(&base3, idx2);
2391
2392        assert_eq!(derived11a, derived11b);
2393        assert_eq!(derived12a, derived12b);
2394        assert_eq!(derived21a, derived21b);
2395        assert_eq!(derived22a, derived22b);
2396
2397        assert_ne!(derived11a, derived12a);
2398        assert_ne!(derived11a, derived21a);
2399        assert_ne!(derived11a, derived22a);
2400        assert_ne!(derived12a, derived21a);
2401        assert_ne!(derived12a, derived22a);
2402        assert_ne!(derived21a, derived22a);
2403
2404        assert_ne!(derived11a, derived31);
2405        assert_ne!(derived21a, derived31);
2406        assert_ne!(derived12a, derived32);
2407        assert_ne!(derived22a, derived32);
2408        assert_ne!(derived31, derived32);
2409    }
2410
2411    #[test]
2412    fn initial_component_file_path_from_absolute() {
2413        let path = ComponentFilePath::from_abs_str("/a/b/c").unwrap();
2414        assert_eq!(path.to_string(), "/a/b/c");
2415    }
2416
2417    #[test]
2418    fn initial_component_file_path_from_relative() {
2419        let path = ComponentFilePath::from_abs_str("a/b/c");
2420        assert!(path.is_err());
2421    }
2422}