golem_common/model/
mod.rs

1// Copyright 2024-2025 Golem Cloud
2//
3// Licensed under the Golem Source License v1.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://license.golem.cloud/LICENSE
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::model::oplog::{IndexedResourceKey, TimestampedUpdateDescription, WorkerResourceId};
16use crate::model::regions::DeletedRegions;
17use bincode::de::{BorrowDecoder, Decoder};
18use bincode::enc::Encoder;
19use bincode::error::{DecodeError, EncodeError};
20use bincode::{BorrowDecode, Decode, Encode};
21
22pub use crate::base_model::*;
23use crate::model::invocation_context::InvocationContextStack;
24use golem_wasm_ast::analysis::analysed_type::{field, list, record, str, tuple, u32, u64};
25use golem_wasm_ast::analysis::AnalysedType;
26use golem_wasm_rpc::{IntoValue, Value};
27use golem_wasm_rpc_derive::IntoValue;
28use http::Uri;
29use rand::prelude::IteratorRandom;
30use serde::de::Unexpected;
31use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
32use std::cmp::Ordering;
33use std::collections::{HashMap, HashSet, VecDeque};
34use std::fmt::{Display, Formatter};
35use std::ops::Add;
36use std::str::FromStr;
37use std::time::{Duration, SystemTime};
38use typed_path::Utf8UnixPathBuf;
39use uuid::{uuid, Uuid};
40
41pub mod auth;
42pub mod base64;
43pub mod component;
44pub mod component_constraint;
45pub mod component_metadata;
46pub mod config;
47pub mod error;
48pub mod exports;
49pub mod invocation_context;
50pub mod lucene;
51pub mod oplog;
52pub mod plugin;
53pub mod project;
54pub mod public_oplog;
55pub mod regions;
56pub mod trim_date;
57
58#[cfg(feature = "poem")]
59mod poem;
60
61#[cfg(feature = "protobuf")]
62pub mod protobuf;
63
64#[cfg(feature = "poem")]
65pub trait PoemTypeRequirements:
66    poem_openapi::types::Type + poem_openapi::types::ParseFromJSON + poem_openapi::types::ToJSON
67{
68}
69
70#[cfg(not(feature = "poem"))]
71pub trait PoemTypeRequirements {}
72
73#[cfg(feature = "poem")]
74impl<
75        T: poem_openapi::types::Type
76            + poem_openapi::types::ParseFromJSON
77            + poem_openapi::types::ToJSON,
78    > PoemTypeRequirements for T
79{
80}
81
82#[cfg(not(feature = "poem"))]
83impl<T> PoemTypeRequirements for T {}
84
85#[cfg(feature = "poem")]
86pub trait PoemMultipartTypeRequirements: poem_openapi::types::ParseFromMultipartField {}
87
88#[cfg(not(feature = "poem"))]
89pub trait PoemMultipartTypeRequirements {}
90
91#[cfg(feature = "poem")]
92impl<T: poem_openapi::types::ParseFromMultipartField> PoemMultipartTypeRequirements for T {}
93
94#[cfg(not(feature = "poem"))]
95impl<T> PoemMultipartTypeRequirements for T {}
96
97#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
98#[repr(transparent)]
99pub struct Timestamp(iso8601_timestamp::Timestamp);
100
101impl Timestamp {
102    pub fn now_utc() -> Timestamp {
103        Timestamp(iso8601_timestamp::Timestamp::now_utc())
104    }
105
106    pub fn to_millis(&self) -> u64 {
107        self.0
108            .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH)
109            .whole_milliseconds() as u64
110    }
111}
112
113impl Display for Timestamp {
114    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115        write!(f, "{}", self.0)
116    }
117}
118
119impl FromStr for Timestamp {
120    type Err = String;
121
122    fn from_str(s: &str) -> Result<Self, Self::Err> {
123        match iso8601_timestamp::Timestamp::parse(s) {
124            Some(ts) => Ok(Self(ts)),
125            None => Err("Invalid timestamp".to_string()),
126        }
127    }
128}
129
130impl serde::Serialize for Timestamp {
131    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
132    where
133        S: Serializer,
134    {
135        self.0.serialize(serializer)
136    }
137}
138
139impl<'de> serde::Deserialize<'de> for Timestamp {
140    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
141    where
142        D: Deserializer<'de>,
143    {
144        if deserializer.is_human_readable() {
145            iso8601_timestamp::Timestamp::deserialize(deserializer).map(Self)
146        } else {
147            // For non-human-readable formats we assume it was an i64 representing milliseconds from epoch
148            let timestamp = i64::deserialize(deserializer)?;
149            Ok(Timestamp(
150                iso8601_timestamp::Timestamp::UNIX_EPOCH
151                    .add(Duration::from_millis(timestamp as u64)),
152            ))
153        }
154    }
155}
156
157impl bincode::Encode for Timestamp {
158    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
159        (self
160            .0
161            .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH)
162            .whole_milliseconds() as i64)
163            .encode(encoder)
164    }
165}
166
167impl<Context> bincode::Decode<Context> for Timestamp {
168    fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
169        let timestamp: i64 = bincode::Decode::decode(decoder)?;
170        Ok(Timestamp(
171            iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(timestamp as u64)),
172        ))
173    }
174}
175
176impl<'de, Context> bincode::BorrowDecode<'de, Context> for Timestamp {
177    fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
178        decoder: &mut D,
179    ) -> Result<Self, DecodeError> {
180        let timestamp: i64 = bincode::BorrowDecode::borrow_decode(decoder)?;
181        Ok(Timestamp(
182            iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(timestamp as u64)),
183        ))
184    }
185}
186
187impl From<u64> for Timestamp {
188    fn from(value: u64) -> Self {
189        Timestamp(iso8601_timestamp::Timestamp::UNIX_EPOCH.add(Duration::from_millis(value)))
190    }
191}
192
193impl IntoValue for Timestamp {
194    fn into_value(self) -> Value {
195        let d = self
196            .0
197            .duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH);
198        Value::Record(vec![
199            Value::U64(d.whole_seconds() as u64),
200            Value::U32(d.subsec_nanoseconds() as u32),
201        ])
202    }
203
204    fn get_type() -> AnalysedType {
205        record(vec![field("seconds", u64()), field("nanoseconds", u32())])
206    }
207}
208
209/// Associates a worker-id with its owner project
210#[derive(Clone, Debug, Eq, PartialEq, Hash, Encode, Decode)]
211pub struct OwnedWorkerId {
212    pub project_id: ProjectId,
213    pub worker_id: WorkerId,
214}
215
216impl OwnedWorkerId {
217    pub fn new(project_id: &ProjectId, worker_id: &WorkerId) -> Self {
218        Self {
219            project_id: project_id.clone(),
220            worker_id: worker_id.clone(),
221        }
222    }
223
224    pub fn worker_id(&self) -> WorkerId {
225        self.worker_id.clone()
226    }
227
228    pub fn project_id(&self) -> ProjectId {
229        self.project_id.clone()
230    }
231
232    pub fn component_id(&self) -> ComponentId {
233        self.worker_id.component_id.clone()
234    }
235
236    pub fn worker_name(&self) -> String {
237        self.worker_id.worker_name.clone()
238    }
239}
240
241impl Display for OwnedWorkerId {
242    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243        write!(f, "{}/{}", self.project_id, self.worker_id)
244    }
245}
246
247impl AsRef<WorkerId> for OwnedWorkerId {
248    fn as_ref(&self) -> &WorkerId {
249        &self.worker_id
250    }
251}
252
253/// Actions that can be scheduled to be executed at a given point in time
254#[derive(Debug, Clone, PartialEq, Encode, Decode)]
255pub enum ScheduledAction {
256    /// Completes a given promise
257    CompletePromise {
258        account_id: AccountId,
259        project_id: ProjectId,
260        promise_id: PromiseId,
261    },
262    /// Archives all entries from the first non-empty layer of an oplog to the next layer,
263    /// if the last oplog index did not change. If there are more layers below, schedules
264    /// a next action to archive the next layer.
265    ArchiveOplog {
266        account_id: AccountId,
267        owned_worker_id: OwnedWorkerId,
268        last_oplog_index: OplogIndex,
269        next_after: Duration,
270    },
271    /// Invoke the given action on the worker. The invocation will only
272    /// be persisted in the oplog when it's actually getting scheduled.
273    Invoke {
274        account_id: AccountId,
275        owned_worker_id: OwnedWorkerId,
276        idempotency_key: IdempotencyKey,
277        full_function_name: String,
278        function_input: Vec<Value>,
279        invocation_context: InvocationContextStack,
280    },
281}
282
283impl ScheduledAction {
284    pub fn owned_worker_id(&self) -> OwnedWorkerId {
285        match self {
286            ScheduledAction::CompletePromise {
287                project_id,
288                promise_id,
289                ..
290            } => OwnedWorkerId::new(project_id, &promise_id.worker_id),
291            ScheduledAction::ArchiveOplog {
292                owned_worker_id, ..
293            } => owned_worker_id.clone(),
294            ScheduledAction::Invoke {
295                owned_worker_id, ..
296            } => owned_worker_id.clone(),
297        }
298    }
299}
300
301impl Display for ScheduledAction {
302    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
303        match self {
304            ScheduledAction::CompletePromise { promise_id, .. } => {
305                write!(f, "complete[{promise_id}]")
306            }
307            ScheduledAction::ArchiveOplog {
308                owned_worker_id, ..
309            } => {
310                write!(f, "archive[{owned_worker_id}]")
311            }
312            ScheduledAction::Invoke {
313                owned_worker_id, ..
314            } => write!(f, "invoke[{owned_worker_id}]"),
315        }
316    }
317}
318
319#[derive(Debug, Clone, Encode, Decode)]
320pub struct ScheduleId {
321    pub timestamp: i64,
322    pub action: ScheduledAction,
323}
324
325impl Display for ScheduleId {
326    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
327        write!(f, "{}@{}", self.action, self.timestamp)
328    }
329}
330
331#[derive(Clone)]
332pub struct NumberOfShards {
333    pub value: usize,
334}
335
336#[derive(Clone, Debug, PartialEq, Eq, Hash)]
337pub struct Pod {
338    host: String,
339    port: u16,
340}
341
342impl Pod {
343    pub fn uri(&self) -> Uri {
344        Uri::builder()
345            .scheme("http")
346            .authority(format!("{}:{}", self.host, self.port).as_str())
347            .path_and_query("/")
348            .build()
349            .expect("Failed to build URI")
350    }
351}
352
353#[derive(Clone)]
354pub struct RoutingTable {
355    pub number_of_shards: NumberOfShards,
356    shard_assignments: HashMap<ShardId, Pod>,
357}
358
359impl RoutingTable {
360    pub fn lookup(&self, worker_id: &WorkerId) -> Option<&Pod> {
361        self.shard_assignments.get(&ShardId::from_worker_id(
362            &worker_id.clone(),
363            self.number_of_shards.value,
364        ))
365    }
366
367    pub fn random(&self) -> Option<&Pod> {
368        self.shard_assignments.values().choose(&mut rand::rng())
369    }
370
371    pub fn first(&self) -> Option<&Pod> {
372        self.shard_assignments.values().next()
373    }
374
375    pub fn all(&self) -> HashSet<&Pod> {
376        self.shard_assignments.values().collect()
377    }
378}
379
380#[allow(dead_code)]
381pub struct RoutingTableEntry {
382    shard_id: ShardId,
383    pod: Pod,
384}
385
386#[derive(Clone, Debug, Default)]
387pub struct ShardAssignment {
388    pub number_of_shards: usize,
389    pub shard_ids: HashSet<ShardId>,
390}
391
392impl ShardAssignment {
393    pub fn new(number_of_shards: usize, shard_ids: HashSet<ShardId>) -> Self {
394        Self {
395            number_of_shards,
396            shard_ids,
397        }
398    }
399
400    pub fn assign_shards(&mut self, shard_ids: &HashSet<ShardId>) {
401        for shard_id in shard_ids {
402            self.shard_ids.insert(*shard_id);
403        }
404    }
405
406    pub fn register(&mut self, number_of_shards: usize, shard_ids: &HashSet<ShardId>) {
407        self.number_of_shards = number_of_shards;
408        for shard_id in shard_ids {
409            self.shard_ids.insert(*shard_id);
410        }
411    }
412
413    pub fn revoke_shards(&mut self, shard_ids: &HashSet<ShardId>) {
414        for shard_id in shard_ids {
415            self.shard_ids.remove(shard_id);
416        }
417    }
418}
419
420impl Display for ShardAssignment {
421    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
422        let shard_ids = self
423            .shard_ids
424            .iter()
425            .map(|shard_id| shard_id.to_string())
426            .collect::<Vec<_>>()
427            .join(",");
428        write!(
429            f,
430            "{{ number_of_shards: {}, shard_ids: {} }}",
431            self.number_of_shards, shard_ids
432        )
433    }
434}
435
436#[derive(Clone, Debug, Encode, Decode, Eq, Hash, PartialEq, IntoValue)]
437#[wit_transparent]
438pub struct IdempotencyKey {
439    pub value: String,
440}
441
442impl IdempotencyKey {
443    const ROOT_NS: Uuid = uuid!("9C19B15A-C83D-46F7-9BC3-EAD7923733F4");
444
445    pub fn new(value: String) -> Self {
446        Self { value }
447    }
448
449    pub fn from_uuid(value: Uuid) -> Self {
450        Self {
451            value: value.to_string(),
452        }
453    }
454
455    pub fn fresh() -> Self {
456        Self::from_uuid(Uuid::new_v4())
457    }
458
459    /// Generates a deterministic new idempotency key using a base idempotency key and an oplog index.
460    ///
461    /// The base idempotency key determines the "namespace" of the generated key UUIDv5. If
462    /// the base idempotency key is already an UUID, it is directly used as the namespace of the v5 algorithm,
463    /// while the name part is derived from the given oplog index.
464    ///
465    /// If the base idempotency key is not an UUID (as it can be an arbitrary user-provided string), then first
466    /// we generate a UUIDv5 in the ROOT_NS namespace and use that as unique namespace for generating
467    /// the new idempotency key.
468    pub fn derived(base: &IdempotencyKey, oplog_index: OplogIndex) -> Self {
469        let namespace = if let Ok(base_uuid) = Uuid::parse_str(&base.value) {
470            base_uuid
471        } else {
472            Uuid::new_v5(&Self::ROOT_NS, base.value.as_bytes())
473        };
474        let name = format!("oplog-index-{oplog_index}");
475        Self::from_uuid(Uuid::new_v5(&namespace, name.as_bytes()))
476    }
477}
478
479impl Serialize for IdempotencyKey {
480    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
481    where
482        S: Serializer,
483    {
484        self.value.serialize(serializer)
485    }
486}
487
488impl<'de> Deserialize<'de> for IdempotencyKey {
489    fn deserialize<D>(deserializer: D) -> Result<IdempotencyKey, D::Error>
490    where
491        D: Deserializer<'de>,
492    {
493        let value = String::deserialize(deserializer)?;
494        Ok(IdempotencyKey { value })
495    }
496}
497
498impl Display for IdempotencyKey {
499    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
500        write!(f, "{}", self.value)
501    }
502}
503
504#[derive(Clone, Debug)]
505pub struct WorkerMetadata {
506    pub worker_id: WorkerId,
507    pub args: Vec<String>,
508    pub env: Vec<(String, String)>,
509    pub project_id: ProjectId,
510    pub created_by: AccountId,
511    pub created_at: Timestamp,
512    pub parent: Option<WorkerId>,
513    pub last_known_status: WorkerStatusRecord,
514}
515
516impl WorkerMetadata {
517    pub fn default(
518        worker_id: WorkerId,
519        created_by: AccountId,
520        project_id: ProjectId,
521    ) -> WorkerMetadata {
522        WorkerMetadata {
523            worker_id,
524            args: vec![],
525            env: vec![],
526            project_id,
527            created_by,
528            created_at: Timestamp::now_utc(),
529            parent: None,
530            last_known_status: WorkerStatusRecord::default(),
531        }
532    }
533
534    pub fn owned_worker_id(&self) -> OwnedWorkerId {
535        OwnedWorkerId::new(&self.project_id, &self.worker_id)
536    }
537}
538
539impl IntoValue for WorkerMetadata {
540    fn into_value(self) -> Value {
541        Value::Record(vec![
542            self.worker_id.into_value(),
543            self.args.into_value(),
544            self.env.into_value(),
545            self.last_known_status.status.into_value(),
546            self.last_known_status.component_version.into_value(),
547            0u64.into_value(), // retry count could be computed from the worker status record here but we don't support it yet
548        ])
549    }
550
551    fn get_type() -> AnalysedType {
552        record(vec![
553            field("worker-id", WorkerId::get_type()),
554            field("args", list(str())),
555            field("env", list(tuple(vec![str(), str()]))),
556            field("status", WorkerStatus::get_type()),
557            field("component-version", u64()),
558            field("retry-count", u64()),
559        ])
560    }
561}
562
563#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
564pub struct WorkerResourceDescription {
565    pub created_at: Timestamp,
566    pub indexed_resource_key: Option<IndexedResourceKey>,
567}
568
569#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)]
570pub struct RetryConfig {
571    pub max_attempts: u32,
572    #[serde(with = "humantime_serde")]
573    pub min_delay: Duration,
574    #[serde(with = "humantime_serde")]
575    pub max_delay: Duration,
576    pub multiplier: f64,
577    pub max_jitter_factor: Option<f64>,
578}
579
580/// Contains status information about a worker according to a given oplog index.
581///
582/// This status is just cached information, all fields must be computable by the oplog alone.
583/// By having an associated oplog_idx, the cached information can be used together with the
584/// tail of the oplog to determine the actual status of the worker.
585#[derive(Clone, Debug, PartialEq, Encode)]
586pub struct WorkerStatusRecord {
587    pub status: WorkerStatus,
588    pub skipped_regions: DeletedRegions,
589    pub overridden_retry_config: Option<RetryConfig>,
590    pub pending_invocations: Vec<TimestampedWorkerInvocation>,
591    pub pending_updates: VecDeque<TimestampedUpdateDescription>,
592    pub failed_updates: Vec<FailedUpdateRecord>,
593    pub successful_updates: Vec<SuccessfulUpdateRecord>,
594    pub invocation_results: HashMap<IdempotencyKey, OplogIndex>,
595    pub current_idempotency_key: Option<IdempotencyKey>,
596    pub component_version: ComponentVersion,
597    pub component_size: u64,
598    pub total_linear_memory_size: u64,
599    pub owned_resources: HashMap<WorkerResourceId, WorkerResourceDescription>,
600    pub oplog_idx: OplogIndex,
601    pub active_plugins: HashSet<PluginInstallationId>,
602    pub deleted_regions: DeletedRegions,
603    /// The component version at the starting point of the replay. Will be the version of the Create oplog entry
604    /// if only automatic updates were used or the version of the latest snapshot based update
605    pub component_version_for_replay: ComponentVersion,
606}
607
608impl<Context> bincode::Decode<Context> for WorkerStatusRecord {
609    fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
610        Ok(Self {
611            status: Decode::decode(decoder)?,
612            skipped_regions: Decode::decode(decoder)?,
613            overridden_retry_config: Decode::decode(decoder)?,
614            pending_invocations: Decode::decode(decoder)?,
615            pending_updates: Decode::decode(decoder)?,
616            failed_updates: Decode::decode(decoder)?,
617            successful_updates: Decode::decode(decoder)?,
618            invocation_results: Decode::decode(decoder)?,
619            current_idempotency_key: Decode::decode(decoder)?,
620            component_version: Decode::decode(decoder)?,
621            component_size: Decode::decode(decoder)?,
622            total_linear_memory_size: Decode::decode(decoder)?,
623            owned_resources: Decode::decode(decoder)?,
624            oplog_idx: Decode::decode(decoder)?,
625            active_plugins: Decode::decode(decoder)?,
626            deleted_regions: Decode::decode(decoder)?,
627            component_version_for_replay: Decode::decode(decoder)?,
628        })
629    }
630}
631impl<'de, Context> BorrowDecode<'de, Context> for WorkerStatusRecord {
632    fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
633        decoder: &mut D,
634    ) -> Result<Self, DecodeError> {
635        Ok(Self {
636            status: BorrowDecode::borrow_decode(decoder)?,
637            skipped_regions: BorrowDecode::borrow_decode(decoder)?,
638            overridden_retry_config: BorrowDecode::borrow_decode(decoder)?,
639            pending_invocations: BorrowDecode::borrow_decode(decoder)?,
640            pending_updates: BorrowDecode::borrow_decode(decoder)?,
641            failed_updates: BorrowDecode::borrow_decode(decoder)?,
642            successful_updates: BorrowDecode::borrow_decode(decoder)?,
643            invocation_results: BorrowDecode::borrow_decode(decoder)?,
644            current_idempotency_key: BorrowDecode::borrow_decode(decoder)?,
645            component_version: BorrowDecode::borrow_decode(decoder)?,
646            component_size: BorrowDecode::borrow_decode(decoder)?,
647            total_linear_memory_size: BorrowDecode::borrow_decode(decoder)?,
648            owned_resources: BorrowDecode::borrow_decode(decoder)?,
649            oplog_idx: BorrowDecode::borrow_decode(decoder)?,
650            active_plugins: BorrowDecode::borrow_decode(decoder)?,
651            deleted_regions: BorrowDecode::borrow_decode(decoder)?,
652            component_version_for_replay: BorrowDecode::borrow_decode(decoder)?,
653        })
654    }
655}
656
657impl Default for WorkerStatusRecord {
658    fn default() -> Self {
659        WorkerStatusRecord {
660            status: WorkerStatus::Idle,
661            skipped_regions: DeletedRegions::new(),
662            overridden_retry_config: None,
663            pending_invocations: Vec::new(),
664            pending_updates: VecDeque::new(),
665            failed_updates: Vec::new(),
666            successful_updates: Vec::new(),
667            invocation_results: HashMap::new(),
668            current_idempotency_key: None,
669            component_version: 0,
670            component_size: 0,
671            total_linear_memory_size: 0,
672            owned_resources: HashMap::new(),
673            oplog_idx: OplogIndex::default(),
674            active_plugins: HashSet::new(),
675            deleted_regions: DeletedRegions::new(),
676            component_version_for_replay: 0,
677        }
678    }
679}
680
681#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
682pub struct FailedUpdateRecord {
683    pub timestamp: Timestamp,
684    pub target_version: ComponentVersion,
685    pub details: Option<String>,
686}
687
688#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
689pub struct SuccessfulUpdateRecord {
690    pub timestamp: Timestamp,
691    pub target_version: ComponentVersion,
692}
693
694/// Represents last known status of a worker
695///
696/// This is always recorded together with the current oplog index, and it can only be used
697/// as a source of truth if there are no newer oplog entries since the record.
698#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode, IntoValue)]
699#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
700pub enum WorkerStatus {
701    /// The worker is running an invoked function
702    Running,
703    /// The worker is ready to run an invoked function
704    Idle,
705    /// An invocation is active but waiting for something (sleeping, waiting for a promise)
706    Suspended,
707    /// The last invocation was interrupted but will be resumed
708    Interrupted,
709    /// The last invocation failed and a retry was scheduled
710    Retrying,
711    /// The last invocation failed and the worker can no longer be used
712    Failed,
713    /// The worker exited after a successful invocation and can no longer be invoked
714    Exited,
715}
716
717impl PartialOrd for WorkerStatus {
718    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
719        Some(self.cmp(other))
720    }
721}
722
723impl Ord for WorkerStatus {
724    fn cmp(&self, other: &Self) -> Ordering {
725        let v1: i32 = self.clone().into();
726        let v2: i32 = other.clone().into();
727        v1.cmp(&v2)
728    }
729}
730
731impl FromStr for WorkerStatus {
732    type Err = String;
733
734    fn from_str(s: &str) -> Result<Self, Self::Err> {
735        match s.to_lowercase().as_str() {
736            "running" => Ok(WorkerStatus::Running),
737            "idle" => Ok(WorkerStatus::Idle),
738            "suspended" => Ok(WorkerStatus::Suspended),
739            "interrupted" => Ok(WorkerStatus::Interrupted),
740            "retrying" => Ok(WorkerStatus::Retrying),
741            "failed" => Ok(WorkerStatus::Failed),
742            "exited" => Ok(WorkerStatus::Exited),
743            _ => Err(format!("Unknown worker status: {s}")),
744        }
745    }
746}
747
748impl Display for WorkerStatus {
749    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
750        match self {
751            WorkerStatus::Running => write!(f, "Running"),
752            WorkerStatus::Idle => write!(f, "Idle"),
753            WorkerStatus::Suspended => write!(f, "Suspended"),
754            WorkerStatus::Interrupted => write!(f, "Interrupted"),
755            WorkerStatus::Retrying => write!(f, "Retrying"),
756            WorkerStatus::Failed => write!(f, "Failed"),
757            WorkerStatus::Exited => write!(f, "Exited"),
758        }
759    }
760}
761
762impl TryFrom<i32> for WorkerStatus {
763    type Error = String;
764
765    fn try_from(value: i32) -> Result<Self, Self::Error> {
766        match value {
767            0 => Ok(WorkerStatus::Running),
768            1 => Ok(WorkerStatus::Idle),
769            2 => Ok(WorkerStatus::Suspended),
770            3 => Ok(WorkerStatus::Interrupted),
771            4 => Ok(WorkerStatus::Retrying),
772            5 => Ok(WorkerStatus::Failed),
773            6 => Ok(WorkerStatus::Exited),
774            _ => Err(format!("Unknown worker status: {value}")),
775        }
776    }
777}
778
779impl From<WorkerStatus> for i32 {
780    fn from(value: WorkerStatus) -> Self {
781        match value {
782            WorkerStatus::Running => 0,
783            WorkerStatus::Idle => 1,
784            WorkerStatus::Suspended => 2,
785            WorkerStatus::Interrupted => 3,
786            WorkerStatus::Retrying => 4,
787            WorkerStatus::Failed => 5,
788            WorkerStatus::Exited => 6,
789        }
790    }
791}
792
793/// Internal representation of `WorkerInvocation` to support backward compatibility
794/// in its binary format.
795#[derive(Clone, Debug, PartialEq, Encode, Decode)]
796enum SerializedWorkerInvocation {
797    ExportedFunctionV1 {
798        idempotency_key: IdempotencyKey,
799        full_function_name: String,
800        function_input: Vec<Value>,
801    },
802    ManualUpdate {
803        target_version: ComponentVersion,
804    },
805    ExportedFunction {
806        idempotency_key: IdempotencyKey,
807        full_function_name: String,
808        function_input: Vec<Value>,
809        invocation_context: InvocationContextStack,
810    },
811}
812
813impl From<WorkerInvocation> for SerializedWorkerInvocation {
814    fn from(value: WorkerInvocation) -> Self {
815        match value {
816            WorkerInvocation::ManualUpdate { target_version } => {
817                Self::ManualUpdate { target_version }
818            }
819            WorkerInvocation::ExportedFunction {
820                idempotency_key,
821                full_function_name,
822                function_input,
823                invocation_context,
824            } => Self::ExportedFunction {
825                idempotency_key,
826                full_function_name,
827                function_input,
828                invocation_context,
829            },
830        }
831    }
832}
833
834impl From<SerializedWorkerInvocation> for WorkerInvocation {
835    fn from(value: SerializedWorkerInvocation) -> Self {
836        match value {
837            SerializedWorkerInvocation::ExportedFunctionV1 {
838                idempotency_key,
839                full_function_name,
840                function_input,
841            } => Self::ExportedFunction {
842                idempotency_key,
843                full_function_name,
844                function_input,
845                invocation_context: InvocationContextStack::fresh(),
846            },
847            SerializedWorkerInvocation::ManualUpdate { target_version } => {
848                Self::ManualUpdate { target_version }
849            }
850            SerializedWorkerInvocation::ExportedFunction {
851                idempotency_key,
852                full_function_name,
853                function_input,
854                invocation_context,
855            } => Self::ExportedFunction {
856                idempotency_key,
857                full_function_name,
858                function_input,
859                invocation_context,
860            },
861        }
862    }
863}
864
865#[derive(Clone, Debug, PartialEq)]
866pub enum WorkerInvocation {
867    ManualUpdate {
868        target_version: ComponentVersion,
869    },
870    ExportedFunction {
871        idempotency_key: IdempotencyKey,
872        full_function_name: String,
873        function_input: Vec<Value>,
874        invocation_context: InvocationContextStack,
875    },
876}
877
878impl WorkerInvocation {
879    pub fn is_idempotency_key(&self, key: &IdempotencyKey) -> bool {
880        match self {
881            Self::ExportedFunction {
882                idempotency_key, ..
883            } => idempotency_key == key,
884            _ => false,
885        }
886    }
887
888    pub fn idempotency_key(&self) -> Option<&IdempotencyKey> {
889        match self {
890            Self::ExportedFunction {
891                idempotency_key, ..
892            } => Some(idempotency_key),
893            _ => None,
894        }
895    }
896
897    pub fn invocation_context(&self) -> InvocationContextStack {
898        match self {
899            Self::ExportedFunction {
900                invocation_context, ..
901            } => invocation_context.clone(),
902            _ => InvocationContextStack::fresh(),
903        }
904    }
905}
906
907impl Encode for WorkerInvocation {
908    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
909        let serialized: SerializedWorkerInvocation = self.clone().into();
910        serialized.encode(encoder)
911    }
912}
913
914impl<Context> Decode<Context> for WorkerInvocation {
915    fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
916        let serialized: SerializedWorkerInvocation = Decode::decode(decoder)?;
917        Ok(serialized.into())
918    }
919}
920
921impl<'de, Context> BorrowDecode<'de, Context> for WorkerInvocation {
922    fn borrow_decode<D: BorrowDecoder<'de, Context = Context>>(
923        decoder: &mut D,
924    ) -> Result<Self, DecodeError> {
925        let serialized: SerializedWorkerInvocation = BorrowDecode::borrow_decode(decoder)?;
926        Ok(serialized.into())
927    }
928}
929
930#[derive(Clone, Debug, PartialEq, Encode, Decode)]
931pub struct TimestampedWorkerInvocation {
932    pub timestamp: Timestamp,
933    pub invocation: WorkerInvocation,
934}
935
936#[derive(
937    Clone,
938    Debug,
939    PartialOrd,
940    Ord,
941    derive_more::FromStr,
942    Eq,
943    Hash,
944    PartialEq,
945    Serialize,
946    Deserialize,
947    Encode,
948    Decode,
949    IntoValue,
950)]
951#[serde(transparent)]
952pub struct AccountId {
953    pub value: String,
954}
955
956impl AccountId {
957    pub fn generate() -> Self {
958        Self {
959            value: Uuid::new_v4().to_string(),
960        }
961    }
962}
963
964impl From<&str> for AccountId {
965    fn from(value: &str) -> Self {
966        Self {
967            value: value.to_string(),
968        }
969    }
970}
971
972impl Display for AccountId {
973    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
974        write!(f, "{}", &self.value)
975    }
976}
977
978#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
979#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
980#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
981#[serde(rename_all = "camelCase")]
982pub struct WorkerNameFilter {
983    pub comparator: StringFilterComparator,
984    pub value: String,
985}
986
987impl WorkerNameFilter {
988    pub fn new(comparator: StringFilterComparator, value: String) -> Self {
989        Self { comparator, value }
990    }
991}
992
993impl Display for WorkerNameFilter {
994    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
995        write!(f, "name {} {}", self.comparator, self.value)
996    }
997}
998
999#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1000#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1001#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1002#[serde(rename_all = "camelCase")]
1003pub struct WorkerStatusFilter {
1004    pub comparator: FilterComparator,
1005    pub value: WorkerStatus,
1006}
1007
1008impl WorkerStatusFilter {
1009    pub fn new(comparator: FilterComparator, value: WorkerStatus) -> Self {
1010        Self { comparator, value }
1011    }
1012}
1013
1014impl Display for WorkerStatusFilter {
1015    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1016        write!(f, "status == {:?}", self.value)
1017    }
1018}
1019
1020#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1021#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1022#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1023#[serde(rename_all = "camelCase")]
1024pub struct WorkerVersionFilter {
1025    pub comparator: FilterComparator,
1026    pub value: ComponentVersion,
1027}
1028
1029impl WorkerVersionFilter {
1030    pub fn new(comparator: FilterComparator, value: ComponentVersion) -> Self {
1031        Self { comparator, value }
1032    }
1033}
1034
1035impl Display for WorkerVersionFilter {
1036    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1037        write!(f, "version {} {}", self.comparator, self.value)
1038    }
1039}
1040
1041#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1042#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1043#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1044#[serde(rename_all = "camelCase")]
1045pub struct WorkerCreatedAtFilter {
1046    pub comparator: FilterComparator,
1047    pub value: Timestamp,
1048}
1049
1050impl WorkerCreatedAtFilter {
1051    pub fn new(comparator: FilterComparator, value: Timestamp) -> Self {
1052        Self { comparator, value }
1053    }
1054}
1055
1056impl Display for WorkerCreatedAtFilter {
1057    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1058        write!(f, "created_at {} {}", self.comparator, self.value)
1059    }
1060}
1061
1062#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1063#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1064#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1065#[serde(rename_all = "camelCase")]
1066pub struct WorkerEnvFilter {
1067    pub name: String,
1068    pub comparator: StringFilterComparator,
1069    pub value: String,
1070}
1071
1072impl WorkerEnvFilter {
1073    pub fn new(name: String, comparator: StringFilterComparator, value: String) -> Self {
1074        Self {
1075            name,
1076            comparator,
1077            value,
1078        }
1079    }
1080}
1081
1082impl Display for WorkerEnvFilter {
1083    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1084        write!(f, "env.{} {} {}", self.name, self.comparator, self.value)
1085    }
1086}
1087
1088#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1089#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1090#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1091#[serde(rename_all = "camelCase")]
1092pub struct WorkerAndFilter {
1093    pub filters: Vec<WorkerFilter>,
1094}
1095
1096impl WorkerAndFilter {
1097    pub fn new(filters: Vec<WorkerFilter>) -> Self {
1098        Self { filters }
1099    }
1100}
1101
1102impl Display for WorkerAndFilter {
1103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104        write!(
1105            f,
1106            "({})",
1107            self.filters
1108                .iter()
1109                .map(|f| f.clone().to_string())
1110                .collect::<Vec<String>>()
1111                .join(" AND ")
1112        )
1113    }
1114}
1115
1116#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1117#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1118#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1119#[serde(rename_all = "camelCase")]
1120pub struct WorkerOrFilter {
1121    pub filters: Vec<WorkerFilter>,
1122}
1123
1124impl WorkerOrFilter {
1125    pub fn new(filters: Vec<WorkerFilter>) -> Self {
1126        Self { filters }
1127    }
1128}
1129
1130impl Display for WorkerOrFilter {
1131    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1132        write!(
1133            f,
1134            "({})",
1135            self.filters
1136                .iter()
1137                .map(|f| f.clone().to_string())
1138                .collect::<Vec<String>>()
1139                .join(" OR ")
1140        )
1141    }
1142}
1143
1144#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1145#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1146#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1147#[serde(rename_all = "camelCase")]
1148pub struct WorkerNotFilter {
1149    filter: Box<WorkerFilter>,
1150}
1151
1152impl WorkerNotFilter {
1153    pub fn new(filter: WorkerFilter) -> Self {
1154        Self {
1155            filter: Box::new(filter),
1156        }
1157    }
1158}
1159
1160impl Display for WorkerNotFilter {
1161    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1162        write!(f, "NOT ({})", self.filter)
1163    }
1164}
1165
1166#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1167#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
1168#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
1169#[serde(tag = "type")]
1170pub enum WorkerFilter {
1171    Name(WorkerNameFilter),
1172    Status(WorkerStatusFilter),
1173    Version(WorkerVersionFilter),
1174    CreatedAt(WorkerCreatedAtFilter),
1175    Env(WorkerEnvFilter),
1176    And(WorkerAndFilter),
1177    Or(WorkerOrFilter),
1178    Not(WorkerNotFilter),
1179}
1180
1181impl WorkerFilter {
1182    pub fn and(&self, filter: WorkerFilter) -> Self {
1183        match self.clone() {
1184            WorkerFilter::And(WorkerAndFilter { filters }) => {
1185                Self::new_and([filters, vec![filter]].concat())
1186            }
1187            f => Self::new_and(vec![f, filter]),
1188        }
1189    }
1190
1191    pub fn or(&self, filter: WorkerFilter) -> Self {
1192        match self.clone() {
1193            WorkerFilter::Or(WorkerOrFilter { filters }) => {
1194                Self::new_or([filters, vec![filter]].concat())
1195            }
1196            f => Self::new_or(vec![f, filter]),
1197        }
1198    }
1199
1200    pub fn not(&self) -> Self {
1201        Self::new_not(self.clone())
1202    }
1203
1204    pub fn matches(&self, metadata: &WorkerMetadata) -> bool {
1205        match self.clone() {
1206            WorkerFilter::Name(WorkerNameFilter { comparator, value }) => {
1207                comparator.matches(&metadata.worker_id.worker_name, &value)
1208            }
1209            WorkerFilter::Version(WorkerVersionFilter { comparator, value }) => {
1210                let version: ComponentVersion = metadata.last_known_status.component_version;
1211                comparator.matches(&version, &value)
1212            }
1213            WorkerFilter::Env(WorkerEnvFilter {
1214                name,
1215                comparator,
1216                value,
1217            }) => {
1218                let mut result = false;
1219                let name = name.to_lowercase();
1220                for env_value in metadata.env.clone() {
1221                    if env_value.0.to_lowercase() == name {
1222                        result = comparator.matches(&env_value.1, &value);
1223
1224                        break;
1225                    }
1226                }
1227                result
1228            }
1229            WorkerFilter::CreatedAt(WorkerCreatedAtFilter { comparator, value }) => {
1230                comparator.matches(&metadata.created_at, &value)
1231            }
1232            WorkerFilter::Status(WorkerStatusFilter { comparator, value }) => {
1233                comparator.matches(&metadata.last_known_status.status, &value)
1234            }
1235            WorkerFilter::Not(WorkerNotFilter { filter }) => !filter.matches(metadata),
1236            WorkerFilter::And(WorkerAndFilter { filters }) => {
1237                let mut result = true;
1238                for filter in filters {
1239                    if !filter.matches(metadata) {
1240                        result = false;
1241                        break;
1242                    }
1243                }
1244                result
1245            }
1246            WorkerFilter::Or(WorkerOrFilter { filters }) => {
1247                let mut result = true;
1248                if !filters.is_empty() {
1249                    result = false;
1250                    for filter in filters {
1251                        if filter.matches(metadata) {
1252                            result = true;
1253                            break;
1254                        }
1255                    }
1256                }
1257                result
1258            }
1259        }
1260    }
1261
1262    pub fn new_and(filters: Vec<WorkerFilter>) -> Self {
1263        WorkerFilter::And(WorkerAndFilter::new(filters))
1264    }
1265
1266    pub fn new_or(filters: Vec<WorkerFilter>) -> Self {
1267        WorkerFilter::Or(WorkerOrFilter::new(filters))
1268    }
1269
1270    pub fn new_not(filter: WorkerFilter) -> Self {
1271        WorkerFilter::Not(WorkerNotFilter::new(filter))
1272    }
1273
1274    pub fn new_name(comparator: StringFilterComparator, value: String) -> Self {
1275        WorkerFilter::Name(WorkerNameFilter::new(comparator, value))
1276    }
1277
1278    pub fn new_env(name: String, comparator: StringFilterComparator, value: String) -> Self {
1279        WorkerFilter::Env(WorkerEnvFilter::new(name, comparator, value))
1280    }
1281
1282    pub fn new_version(comparator: FilterComparator, value: ComponentVersion) -> Self {
1283        WorkerFilter::Version(WorkerVersionFilter::new(comparator, value))
1284    }
1285
1286    pub fn new_status(comparator: FilterComparator, value: WorkerStatus) -> Self {
1287        WorkerFilter::Status(WorkerStatusFilter::new(comparator, value))
1288    }
1289
1290    pub fn new_created_at(comparator: FilterComparator, value: Timestamp) -> Self {
1291        WorkerFilter::CreatedAt(WorkerCreatedAtFilter::new(comparator, value))
1292    }
1293
1294    pub fn from(filters: Vec<String>) -> Result<WorkerFilter, String> {
1295        let mut fs = Vec::new();
1296        for f in filters {
1297            fs.push(WorkerFilter::from_str(&f)?);
1298        }
1299        Ok(WorkerFilter::new_and(fs))
1300    }
1301}
1302
1303impl Display for WorkerFilter {
1304    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1305        match self {
1306            WorkerFilter::Name(filter) => {
1307                write!(f, "{filter}")
1308            }
1309            WorkerFilter::Version(filter) => {
1310                write!(f, "{filter}")
1311            }
1312            WorkerFilter::Status(filter) => {
1313                write!(f, "{filter}")
1314            }
1315            WorkerFilter::CreatedAt(filter) => {
1316                write!(f, "{filter}")
1317            }
1318            WorkerFilter::Env(filter) => {
1319                write!(f, "{filter}")
1320            }
1321            WorkerFilter::Not(filter) => {
1322                write!(f, "{filter}")
1323            }
1324            WorkerFilter::And(filter) => {
1325                write!(f, "{filter}")
1326            }
1327            WorkerFilter::Or(filter) => {
1328                write!(f, "{filter}")
1329            }
1330        }
1331    }
1332}
1333
1334impl FromStr for WorkerFilter {
1335    type Err = String;
1336
1337    fn from_str(s: &str) -> Result<Self, Self::Err> {
1338        let elements = s.split_whitespace().collect::<Vec<&str>>();
1339
1340        if elements.len() == 3 {
1341            let arg = elements[0];
1342            let comparator = elements[1];
1343            let value = elements[2];
1344            match arg {
1345                "name" => Ok(WorkerFilter::new_name(
1346                    comparator.parse()?,
1347                    value.to_string(),
1348                )),
1349                "version" => Ok(WorkerFilter::new_version(
1350                    comparator.parse()?,
1351                    value
1352                        .parse()
1353                        .map_err(|e| format!("Invalid filter value: {e}"))?,
1354                )),
1355                "status" => Ok(WorkerFilter::new_status(
1356                    comparator.parse()?,
1357                    value.parse()?,
1358                )),
1359                "created_at" | "createdAt" => Ok(WorkerFilter::new_created_at(
1360                    comparator.parse()?,
1361                    value.parse()?,
1362                )),
1363                _ if arg.starts_with("env.") => {
1364                    let name = &arg[4..];
1365                    Ok(WorkerFilter::new_env(
1366                        name.to_string(),
1367                        comparator.parse()?,
1368                        value.to_string(),
1369                    ))
1370                }
1371                _ => Err(format!("Invalid filter: {s}")),
1372            }
1373        } else {
1374            Err(format!("Invalid filter: {s}"))
1375        }
1376    }
1377}
1378
1379#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1380#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1381pub enum StringFilterComparator {
1382    Equal,
1383    NotEqual,
1384    Like,
1385    NotLike,
1386}
1387
1388impl StringFilterComparator {
1389    pub fn matches<T: Display>(&self, value1: &T, value2: &T) -> bool {
1390        match self {
1391            StringFilterComparator::Equal => value1.to_string() == value2.to_string(),
1392            StringFilterComparator::NotEqual => value1.to_string() != value2.to_string(),
1393            StringFilterComparator::Like => {
1394                value1.to_string().contains(value2.to_string().as_str())
1395            }
1396            StringFilterComparator::NotLike => {
1397                !value1.to_string().contains(value2.to_string().as_str())
1398            }
1399        }
1400    }
1401}
1402
1403impl FromStr for StringFilterComparator {
1404    type Err = String;
1405
1406    fn from_str(s: &str) -> Result<Self, Self::Err> {
1407        match s.to_lowercase().as_str() {
1408            "==" | "=" | "equal" | "eq" => Ok(StringFilterComparator::Equal),
1409            "!=" | "notequal" | "ne" => Ok(StringFilterComparator::NotEqual),
1410            "like" => Ok(StringFilterComparator::Like),
1411            "notlike" => Ok(StringFilterComparator::NotLike),
1412            _ => Err(format!("Unknown String Filter Comparator: {s}")),
1413        }
1414    }
1415}
1416
1417impl TryFrom<i32> for StringFilterComparator {
1418    type Error = String;
1419
1420    fn try_from(value: i32) -> Result<Self, Self::Error> {
1421        match value {
1422            0 => Ok(StringFilterComparator::Equal),
1423            1 => Ok(StringFilterComparator::NotEqual),
1424            2 => Ok(StringFilterComparator::Like),
1425            3 => Ok(StringFilterComparator::NotLike),
1426            _ => Err(format!("Unknown String Filter Comparator: {value}")),
1427        }
1428    }
1429}
1430
1431impl From<StringFilterComparator> for i32 {
1432    fn from(value: StringFilterComparator) -> Self {
1433        match value {
1434            StringFilterComparator::Equal => 0,
1435            StringFilterComparator::NotEqual => 1,
1436            StringFilterComparator::Like => 2,
1437            StringFilterComparator::NotLike => 3,
1438        }
1439    }
1440}
1441
1442impl Display for StringFilterComparator {
1443    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1444        let s = match self {
1445            StringFilterComparator::Equal => "==",
1446            StringFilterComparator::NotEqual => "!=",
1447            StringFilterComparator::Like => "like",
1448            StringFilterComparator::NotLike => "notlike",
1449        };
1450        write!(f, "{s}")
1451    }
1452}
1453
1454#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
1455#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1456pub enum FilterComparator {
1457    Equal,
1458    NotEqual,
1459    GreaterEqual,
1460    Greater,
1461    LessEqual,
1462    Less,
1463}
1464
1465impl Display for FilterComparator {
1466    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1467        let s = match self {
1468            FilterComparator::Equal => "==",
1469            FilterComparator::NotEqual => "!=",
1470            FilterComparator::GreaterEqual => ">=",
1471            FilterComparator::Greater => ">",
1472            FilterComparator::LessEqual => "<=",
1473            FilterComparator::Less => "<",
1474        };
1475        write!(f, "{s}")
1476    }
1477}
1478
1479impl FilterComparator {
1480    pub fn matches<T: Ord>(&self, value1: &T, value2: &T) -> bool {
1481        match self {
1482            FilterComparator::Equal => value1 == value2,
1483            FilterComparator::NotEqual => value1 != value2,
1484            FilterComparator::Less => value1 < value2,
1485            FilterComparator::LessEqual => value1 <= value2,
1486            FilterComparator::Greater => value1 > value2,
1487            FilterComparator::GreaterEqual => value1 >= value2,
1488        }
1489    }
1490}
1491
1492impl FromStr for FilterComparator {
1493    type Err = String;
1494    fn from_str(s: &str) -> Result<Self, Self::Err> {
1495        match s.to_lowercase().as_str() {
1496            "==" | "=" | "equal" | "eq" => Ok(FilterComparator::Equal),
1497            "!=" | "notequal" | "ne" => Ok(FilterComparator::NotEqual),
1498            ">=" | "greaterequal" | "ge" => Ok(FilterComparator::GreaterEqual),
1499            ">" | "greater" | "gt" => Ok(FilterComparator::Greater),
1500            "<=" | "lessequal" | "le" => Ok(FilterComparator::LessEqual),
1501            "<" | "less" | "lt" => Ok(FilterComparator::Less),
1502            _ => Err(format!("Unknown Filter Comparator: {s}")),
1503        }
1504    }
1505}
1506
1507impl TryFrom<i32> for FilterComparator {
1508    type Error = String;
1509
1510    fn try_from(value: i32) -> Result<Self, Self::Error> {
1511        match value {
1512            0 => Ok(FilterComparator::Equal),
1513            1 => Ok(FilterComparator::NotEqual),
1514            2 => Ok(FilterComparator::Less),
1515            3 => Ok(FilterComparator::LessEqual),
1516            4 => Ok(FilterComparator::Greater),
1517            5 => Ok(FilterComparator::GreaterEqual),
1518            _ => Err(format!("Unknown Filter Comparator: {value}")),
1519        }
1520    }
1521}
1522
1523impl From<FilterComparator> for i32 {
1524    fn from(value: FilterComparator) -> Self {
1525        match value {
1526            FilterComparator::Equal => 0,
1527            FilterComparator::NotEqual => 1,
1528            FilterComparator::Less => 2,
1529            FilterComparator::LessEqual => 3,
1530            FilterComparator::Greater => 4,
1531            FilterComparator::GreaterEqual => 5,
1532        }
1533    }
1534}
1535
1536#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, Default)]
1537#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1538#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1539#[serde(rename_all = "camelCase")]
1540pub struct ScanCursor {
1541    pub cursor: u64,
1542    pub layer: usize,
1543}
1544
1545impl ScanCursor {
1546    pub fn is_active_layer_finished(&self) -> bool {
1547        self.cursor == 0
1548    }
1549
1550    pub fn is_finished(&self) -> bool {
1551        self.cursor == 0 && self.layer == 0
1552    }
1553
1554    pub fn into_option(self) -> Option<Self> {
1555        if self.is_finished() {
1556            None
1557        } else {
1558            Some(self)
1559        }
1560    }
1561}
1562
1563impl Display for ScanCursor {
1564    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1565        write!(f, "{}/{}", self.layer, self.cursor)
1566    }
1567}
1568
1569impl FromStr for ScanCursor {
1570    type Err = String;
1571
1572    fn from_str(s: &str) -> Result<Self, Self::Err> {
1573        let parts = s.split('/').collect::<Vec<&str>>();
1574        if parts.len() == 2 {
1575            Ok(ScanCursor {
1576                layer: parts[0]
1577                    .parse()
1578                    .map_err(|e| format!("Invalid layer part: {e}"))?,
1579                cursor: parts[1]
1580                    .parse()
1581                    .map_err(|e| format!("Invalid cursor part: {e}"))?,
1582            })
1583        } else {
1584            Err("Invalid cursor, must have 'layer/cursor' format".to_string())
1585        }
1586    }
1587}
1588
1589#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
1590pub enum LogLevel {
1591    Trace,
1592    Debug,
1593    Info,
1594    Warn,
1595    Error,
1596    Critical,
1597}
1598
1599#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1600pub enum WorkerEvent {
1601    StdOut {
1602        timestamp: Timestamp,
1603        bytes: Vec<u8>,
1604    },
1605    StdErr {
1606        timestamp: Timestamp,
1607        bytes: Vec<u8>,
1608    },
1609    Log {
1610        timestamp: Timestamp,
1611        level: LogLevel,
1612        context: String,
1613        message: String,
1614    },
1615    InvocationStart {
1616        timestamp: Timestamp,
1617        function: String,
1618        idempotency_key: IdempotencyKey,
1619    },
1620    InvocationFinished {
1621        timestamp: Timestamp,
1622        function: String,
1623        idempotency_key: IdempotencyKey,
1624    },
1625    /// The client fell behind and the point it left of is no longer in our buffer.
1626    /// {number_of_skipped_messages} is the number of messages between the client left of and the point it is now at.
1627    ClientLagged { number_of_missed_messages: u64 },
1628}
1629
1630impl Display for WorkerEvent {
1631    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1632        match self {
1633            WorkerEvent::StdOut { bytes, .. } => {
1634                write!(
1635                    f,
1636                    "<stdout> {}",
1637                    String::from_utf8(bytes.clone()).unwrap_or_default()
1638                )
1639            }
1640            WorkerEvent::StdErr { bytes, .. } => {
1641                write!(
1642                    f,
1643                    "<stderr> {}",
1644                    String::from_utf8(bytes.clone()).unwrap_or_default()
1645                )
1646            }
1647            WorkerEvent::Log {
1648                level,
1649                context,
1650                message,
1651                ..
1652            } => {
1653                write!(f, "<log> {level:?} {context} {message}")
1654            }
1655            WorkerEvent::InvocationStart {
1656                function,
1657                idempotency_key,
1658                ..
1659            } => {
1660                write!(f, "<invocation-start> {function} {idempotency_key}")
1661            }
1662            WorkerEvent::InvocationFinished {
1663                function,
1664                idempotency_key,
1665                ..
1666            } => {
1667                write!(f, "<invocation-finished> {function} {idempotency_key}")
1668            }
1669            WorkerEvent::ClientLagged {
1670                number_of_missed_messages,
1671            } => {
1672                write!(f, "<client-lagged> {number_of_missed_messages}")
1673            }
1674        }
1675    }
1676}
1677
1678#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
1679#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1680#[repr(i32)]
1681pub enum ComponentType {
1682    Durable = 0,
1683    Ephemeral = 1,
1684}
1685
1686impl TryFrom<i32> for ComponentType {
1687    type Error = String;
1688
1689    fn try_from(value: i32) -> Result<Self, Self::Error> {
1690        match value {
1691            0 => Ok(ComponentType::Durable),
1692            1 => Ok(ComponentType::Ephemeral),
1693            _ => Err(format!("Unknown Component Type: {value}")),
1694        }
1695    }
1696}
1697
1698impl Display for ComponentType {
1699    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1700        let s = match self {
1701            ComponentType::Durable => "Durable",
1702            ComponentType::Ephemeral => "Ephemeral",
1703        };
1704        write!(f, "{s}")
1705    }
1706}
1707
1708impl FromStr for ComponentType {
1709    type Err = String;
1710
1711    fn from_str(s: &str) -> Result<Self, Self::Err> {
1712        match s {
1713            "Durable" => Ok(ComponentType::Durable),
1714            "Ephemeral" => Ok(ComponentType::Ephemeral),
1715            _ => Err(format!("Unknown Component Type: {s}")),
1716        }
1717    }
1718}
1719
1720#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1721#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1722#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1723#[serde(rename_all = "camelCase")]
1724pub struct Empty {}
1725
1726/// Key that can be used to identify a component file.
1727/// All files with the same content will have the same key.
1728#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1729#[cfg_attr(feature = "poem", derive(poem_openapi::NewType))]
1730pub struct InitialComponentFileKey(pub String);
1731
1732impl Display for InitialComponentFileKey {
1733    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1734        self.0.fmt(f)
1735    }
1736}
1737
1738/// Path inside a component filesystem. Must be
1739/// - absolute (start with '/')
1740/// - not contain ".." components
1741/// - not contain "." components
1742/// - use '/' as a separator
1743#[derive(Clone, Debug, Eq, PartialEq, Hash)]
1744pub struct ComponentFilePath(Utf8UnixPathBuf);
1745
1746impl ComponentFilePath {
1747    pub fn from_abs_str(s: &str) -> Result<Self, String> {
1748        let buf: Utf8UnixPathBuf = s.into();
1749        if !buf.is_absolute() {
1750            return Err("Path must be absolute".to_string());
1751        }
1752
1753        Ok(ComponentFilePath(buf.normalize()))
1754    }
1755
1756    pub fn from_rel_str(s: &str) -> Result<Self, String> {
1757        Self::from_abs_str(&format!("/{s}"))
1758    }
1759
1760    pub fn from_either_str(s: &str) -> Result<Self, String> {
1761        if s.starts_with('/') {
1762            Self::from_abs_str(s)
1763        } else {
1764            Self::from_rel_str(s)
1765        }
1766    }
1767
1768    pub fn as_path(&self) -> &Utf8UnixPathBuf {
1769        &self.0
1770    }
1771
1772    pub fn to_rel_string(&self) -> String {
1773        self.0.strip_prefix("/").unwrap().to_string()
1774    }
1775
1776    pub fn extend(&mut self, path: &str) -> Result<(), String> {
1777        self.0.push_checked(path).map_err(|e| e.to_string())
1778    }
1779}
1780
1781impl Display for ComponentFilePath {
1782    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1783        self.0.fmt(f)
1784    }
1785}
1786
1787impl Serialize for ComponentFilePath {
1788    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1789    where
1790        S: Serializer,
1791    {
1792        String::serialize(&self.to_string(), serializer)
1793    }
1794}
1795
1796impl<'de> Deserialize<'de> for ComponentFilePath {
1797    fn deserialize<D>(deserializer: D) -> Result<ComponentFilePath, D::Error>
1798    where
1799        D: Deserializer<'de>,
1800    {
1801        let str = String::deserialize(deserializer)?;
1802        Self::from_abs_str(&str).map_err(de::Error::custom)
1803    }
1804}
1805
1806impl TryFrom<&str> for ComponentFilePath {
1807    type Error = String;
1808    fn try_from(value: &str) -> Result<Self, Self::Error> {
1809        Self::from_either_str(value)
1810    }
1811}
1812
1813#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
1814#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1815#[serde(rename_all = "kebab-case")]
1816#[cfg_attr(feature = "poem", oai(rename_all = "kebab-case"))]
1817pub enum ComponentFilePermissions {
1818    ReadOnly,
1819    ReadWrite,
1820}
1821
1822impl ComponentFilePermissions {
1823    pub fn as_compact_str(&self) -> &'static str {
1824        match self {
1825            ComponentFilePermissions::ReadOnly => "ro",
1826            ComponentFilePermissions::ReadWrite => "rw",
1827        }
1828    }
1829    pub fn from_compact_str(s: &str) -> Result<Self, String> {
1830        match s {
1831            "ro" => Ok(ComponentFilePermissions::ReadOnly),
1832            "rw" => Ok(ComponentFilePermissions::ReadWrite),
1833            _ => Err(format!("Unknown permissions: {s}")),
1834        }
1835    }
1836}
1837
1838#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1839#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1840#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1841#[serde(rename_all = "camelCase")]
1842pub struct InitialComponentFile {
1843    pub key: InitialComponentFileKey,
1844    pub path: ComponentFilePath,
1845    pub permissions: ComponentFilePermissions,
1846}
1847
1848impl InitialComponentFile {
1849    pub fn is_read_only(&self) -> bool {
1850        self.permissions == ComponentFilePermissions::ReadOnly
1851    }
1852}
1853
1854#[derive(Clone, Debug, Serialize, Deserialize)]
1855#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1856#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1857#[serde(rename_all = "camelCase")]
1858pub struct ComponentFilePathWithPermissions {
1859    pub path: ComponentFilePath,
1860    pub permissions: ComponentFilePermissions,
1861}
1862
1863impl ComponentFilePathWithPermissions {
1864    pub fn extend_path(&mut self, path: &str) -> Result<(), String> {
1865        self.path.extend(path)
1866    }
1867}
1868
1869impl Display for ComponentFilePathWithPermissions {
1870    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1871        write!(f, "{}", serde_json::to_string(self).unwrap())
1872    }
1873}
1874
1875#[derive(Clone, Debug, Serialize, Deserialize)]
1876#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1877#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1878#[serde(rename_all = "camelCase")]
1879pub struct ComponentFilePathWithPermissionsList {
1880    pub values: Vec<ComponentFilePathWithPermissions>,
1881}
1882
1883impl Display for ComponentFilePathWithPermissionsList {
1884    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1885        write!(f, "{}", serde_json::to_string(self).unwrap())
1886    }
1887}
1888
1889#[derive(Clone, Debug, PartialEq)]
1890pub enum ComponentFileSystemNodeDetails {
1891    File {
1892        permissions: ComponentFilePermissions,
1893        size: u64,
1894    },
1895    Directory,
1896}
1897
1898#[derive(Clone, Debug, PartialEq)]
1899pub struct ComponentFileSystemNode {
1900    pub name: String,
1901    pub last_modified: SystemTime,
1902    pub details: ComponentFileSystemNodeDetails,
1903}
1904
1905#[derive(Debug, Clone, PartialEq, Serialize, Encode, Decode, Default)]
1906#[cfg_attr(feature = "poem", derive(poem_openapi::Enum))]
1907#[serde(rename_all = "kebab-case")]
1908#[cfg_attr(feature = "poem", oai(rename_all = "kebab-case"))]
1909pub enum GatewayBindingType {
1910    #[default]
1911    Default,
1912    FileServer,
1913    HttpHandler,
1914    CorsPreflight,
1915}
1916
1917// To keep backward compatibility as we documented wit-worker to be default
1918impl<'de> Deserialize<'de> for GatewayBindingType {
1919    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1920    where
1921        D: Deserializer<'de>,
1922    {
1923        struct GatewayBindingTypeVisitor;
1924
1925        impl de::Visitor<'_> for GatewayBindingTypeVisitor {
1926            type Value = GatewayBindingType;
1927
1928            fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
1929                formatter.write_str("a string representing the binding type")
1930            }
1931
1932            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
1933            where
1934                E: de::Error,
1935            {
1936                match value {
1937                    "default" | "wit-worker" => Ok(GatewayBindingType::Default),
1938                    "file-server" => Ok(GatewayBindingType::FileServer),
1939                    "cors-preflight" => Ok(GatewayBindingType::CorsPreflight),
1940                    _ => Err(de::Error::invalid_value(Unexpected::Str(value), &self)),
1941                }
1942            }
1943        }
1944
1945        deserializer.deserialize_str(GatewayBindingTypeVisitor)
1946    }
1947}
1948
1949impl TryFrom<String> for GatewayBindingType {
1950    type Error = String;
1951
1952    fn try_from(value: String) -> Result<Self, Self::Error> {
1953        match value.as_str() {
1954            "default" => Ok(GatewayBindingType::Default),
1955            "file-server" => Ok(GatewayBindingType::FileServer),
1956            _ => Err(format!("Invalid WorkerBindingType: {value}")),
1957        }
1958    }
1959}
1960
1961impl From<WorkerId> for golem_wasm_rpc::WorkerId {
1962    fn from(worker_id: WorkerId) -> Self {
1963        golem_wasm_rpc::WorkerId {
1964            component_id: worker_id.component_id.into(),
1965            worker_name: worker_id.worker_name,
1966        }
1967    }
1968}
1969
1970impl From<golem_wasm_rpc::WorkerId> for WorkerId {
1971    fn from(host: golem_wasm_rpc::WorkerId) -> Self {
1972        Self {
1973            component_id: host.component_id.into(),
1974            worker_name: host.worker_name,
1975        }
1976    }
1977}
1978
1979impl From<golem_wasm_rpc::ComponentId> for ComponentId {
1980    fn from(host: golem_wasm_rpc::ComponentId) -> Self {
1981        let high_bits = host.uuid.high_bits;
1982        let low_bits = host.uuid.low_bits;
1983
1984        Self(Uuid::from_u64_pair(high_bits, low_bits))
1985    }
1986}
1987
1988impl From<ComponentId> for golem_wasm_rpc::ComponentId {
1989    fn from(component_id: ComponentId) -> Self {
1990        let (high_bits, low_bits) = component_id.0.as_u64_pair();
1991
1992        golem_wasm_rpc::ComponentId {
1993            uuid: golem_wasm_rpc::Uuid {
1994                high_bits,
1995                low_bits,
1996            },
1997        }
1998    }
1999}
2000
2001#[cfg(test)]
2002mod tests {
2003    use std::collections::HashSet;
2004    use std::str::FromStr;
2005    use std::time::SystemTime;
2006    use std::vec;
2007    use test_r::test;
2008    use tracing::info;
2009
2010    use crate::model::oplog::OplogIndex;
2011
2012    use crate::model::{
2013        AccountId, ComponentFilePath, ComponentId, FilterComparator, IdempotencyKey, ProjectId,
2014        ShardId, StringFilterComparator, TargetWorkerId, Timestamp, WorkerFilter, WorkerId,
2015        WorkerMetadata, WorkerStatus, WorkerStatusRecord,
2016    };
2017    use bincode::{Decode, Encode};
2018
2019    use rand::{rng, Rng};
2020    use serde::{Deserialize, Serialize};
2021
2022    #[test]
2023    fn timestamp_conversion() {
2024        let ts: Timestamp = Timestamp::now_utc();
2025
2026        let prost_ts: prost_types::Timestamp = ts.into();
2027
2028        let ts2: Timestamp = prost_ts.into();
2029
2030        assert_eq!(ts2, ts);
2031    }
2032
2033    #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
2034    struct ExampleWithAccountId {
2035        account_id: AccountId,
2036    }
2037
2038    #[test]
2039    fn account_id_from_json_apigateway_version() {
2040        let json = "{ \"account_id\": \"account-1\" }";
2041        let example: ExampleWithAccountId = serde_json::from_str(json).unwrap();
2042        assert_eq!(
2043            example.account_id,
2044            AccountId {
2045                value: "account-1".to_string()
2046            }
2047        );
2048    }
2049
2050    #[test]
2051    fn account_id_json_serialization() {
2052        // We want to use this variant for serialization because it is used on the public API gateway API
2053        let example: ExampleWithAccountId = ExampleWithAccountId {
2054            account_id: AccountId {
2055                value: "account-1".to_string(),
2056            },
2057        };
2058        let json = serde_json::to_string(&example).unwrap();
2059        assert_eq!(json, "{\"account_id\":\"account-1\"}");
2060    }
2061
2062    #[test]
2063    fn worker_filter_parse() {
2064        assert_eq!(
2065            WorkerFilter::from_str(" name =  worker-1").unwrap(),
2066            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2067        );
2068
2069        assert_eq!(
2070            WorkerFilter::from_str("status == Running").unwrap(),
2071            WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2072        );
2073
2074        assert_eq!(
2075            WorkerFilter::from_str("version >= 10").unwrap(),
2076            WorkerFilter::new_version(FilterComparator::GreaterEqual, 10)
2077        );
2078
2079        assert_eq!(
2080            WorkerFilter::from_str("env.tag1 == abc ").unwrap(),
2081            WorkerFilter::new_env(
2082                "tag1".to_string(),
2083                StringFilterComparator::Equal,
2084                "abc".to_string(),
2085            )
2086        );
2087    }
2088
2089    #[test]
2090    fn worker_filter_combination() {
2091        assert_eq!(
2092            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).not(),
2093            WorkerFilter::new_not(WorkerFilter::new_name(
2094                StringFilterComparator::Equal,
2095                "worker-1".to_string(),
2096            ))
2097        );
2098
2099        assert_eq!(
2100            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).and(
2101                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2102            ),
2103            WorkerFilter::new_and(vec![
2104                WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2105                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2106            ])
2107        );
2108
2109        assert_eq!(
2110            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2111                .and(WorkerFilter::new_status(
2112                    FilterComparator::Equal,
2113                    WorkerStatus::Running,
2114                ))
2115                .and(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2116            WorkerFilter::new_and(vec![
2117                WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2118                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2119                WorkerFilter::new_version(FilterComparator::Equal, 1),
2120            ])
2121        );
2122
2123        assert_eq!(
2124            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()).or(
2125                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running)
2126            ),
2127            WorkerFilter::new_or(vec![
2128                WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2129                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running),
2130            ])
2131        );
2132
2133        assert_eq!(
2134            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2135                .or(WorkerFilter::new_status(
2136                    FilterComparator::NotEqual,
2137                    WorkerStatus::Running,
2138                ))
2139                .or(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2140            WorkerFilter::new_or(vec![
2141                WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2142                WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2143                WorkerFilter::new_version(FilterComparator::Equal, 1),
2144            ])
2145        );
2146
2147        assert_eq!(
2148            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2149                .and(WorkerFilter::new_status(
2150                    FilterComparator::NotEqual,
2151                    WorkerStatus::Running,
2152                ))
2153                .or(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2154            WorkerFilter::new_or(vec![
2155                WorkerFilter::new_and(vec![
2156                    WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2157                    WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2158                ]),
2159                WorkerFilter::new_version(FilterComparator::Equal, 1),
2160            ])
2161        );
2162
2163        assert_eq!(
2164            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2165                .or(WorkerFilter::new_status(
2166                    FilterComparator::NotEqual,
2167                    WorkerStatus::Running,
2168                ))
2169                .and(WorkerFilter::new_version(FilterComparator::Equal, 1)),
2170            WorkerFilter::new_and(vec![
2171                WorkerFilter::new_or(vec![
2172                    WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string()),
2173                    WorkerFilter::new_status(FilterComparator::NotEqual, WorkerStatus::Running),
2174                ]),
2175                WorkerFilter::new_version(FilterComparator::Equal, 1),
2176            ])
2177        );
2178    }
2179
2180    #[test]
2181    fn worker_filter_matches() {
2182        let component_id = ComponentId::new_v4();
2183        let worker_metadata = WorkerMetadata {
2184            worker_id: WorkerId {
2185                worker_name: "worker-1".to_string(),
2186                component_id,
2187            },
2188            args: vec![],
2189            env: vec![
2190                ("env1".to_string(), "value1".to_string()),
2191                ("env2".to_string(), "value2".to_string()),
2192            ],
2193            project_id: ProjectId::new_v4(),
2194            created_by: AccountId {
2195                value: "account-1".to_string(),
2196            },
2197            created_at: Timestamp::now_utc(),
2198            parent: None,
2199            last_known_status: WorkerStatusRecord {
2200                component_version: 1,
2201                ..WorkerStatusRecord::default()
2202            },
2203        };
2204
2205        assert!(
2206            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2207                .and(WorkerFilter::new_status(
2208                    FilterComparator::Equal,
2209                    WorkerStatus::Idle,
2210                ))
2211                .matches(&worker_metadata)
2212        );
2213
2214        assert!(WorkerFilter::new_env(
2215            "env1".to_string(),
2216            StringFilterComparator::Equal,
2217            "value1".to_string(),
2218        )
2219        .and(WorkerFilter::new_status(
2220            FilterComparator::Equal,
2221            WorkerStatus::Idle,
2222        ))
2223        .matches(&worker_metadata));
2224
2225        assert!(WorkerFilter::new_env(
2226            "env1".to_string(),
2227            StringFilterComparator::Equal,
2228            "value2".to_string(),
2229        )
2230        .not()
2231        .and(
2232            WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Running).or(
2233                WorkerFilter::new_status(FilterComparator::Equal, WorkerStatus::Idle)
2234            )
2235        )
2236        .matches(&worker_metadata));
2237
2238        assert!(
2239            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-1".to_string())
2240                .and(WorkerFilter::new_version(FilterComparator::Equal, 1))
2241                .matches(&worker_metadata)
2242        );
2243
2244        assert!(
2245            WorkerFilter::new_name(StringFilterComparator::Equal, "worker-2".to_string())
2246                .or(WorkerFilter::new_version(FilterComparator::Equal, 1))
2247                .matches(&worker_metadata)
2248        );
2249
2250        assert!(WorkerFilter::new_version(FilterComparator::GreaterEqual, 1)
2251            .and(WorkerFilter::new_version(FilterComparator::Less, 2))
2252            .or(WorkerFilter::new_name(
2253                StringFilterComparator::Equal,
2254                "worker-2".to_string(),
2255            ))
2256            .matches(&worker_metadata));
2257    }
2258
2259    #[test]
2260    fn target_worker_id_force_shards() {
2261        let mut rng = rng();
2262        const SHARD_COUNT: usize = 1000;
2263        const EXAMPLE_COUNT: usize = 1000;
2264        for _ in 0..EXAMPLE_COUNT {
2265            let mut shard_ids = HashSet::new();
2266            let count = rng.random_range(0..100);
2267            for _ in 0..count {
2268                let shard_id = rng.random_range(0..SHARD_COUNT);
2269                shard_ids.insert(ShardId {
2270                    value: shard_id as i64,
2271                });
2272            }
2273
2274            let component_id = ComponentId::new_v4();
2275            let target_worker_id = TargetWorkerId {
2276                component_id,
2277                worker_name: None,
2278            };
2279
2280            let start = SystemTime::now();
2281            let worker_id = target_worker_id.into_worker_id(&shard_ids, SHARD_COUNT);
2282            let end = SystemTime::now();
2283            info!(
2284                "Time with {count} valid shards: {:?}",
2285                end.duration_since(start).unwrap()
2286            );
2287
2288            if !shard_ids.is_empty() {
2289                assert!(shard_ids.contains(&ShardId::from_worker_id(&worker_id, SHARD_COUNT)));
2290            }
2291        }
2292    }
2293
2294    #[test]
2295    fn derived_idempotency_key() {
2296        let base1 = IdempotencyKey::fresh();
2297        let base2 = IdempotencyKey::fresh();
2298        let base3 = IdempotencyKey {
2299            value: "base3".to_string(),
2300        };
2301
2302        assert_ne!(base1, base2);
2303
2304        let idx1 = OplogIndex::from_u64(2);
2305        let idx2 = OplogIndex::from_u64(11);
2306
2307        let derived11a = IdempotencyKey::derived(&base1, idx1);
2308        let derived12a = IdempotencyKey::derived(&base1, idx2);
2309        let derived21a = IdempotencyKey::derived(&base2, idx1);
2310        let derived22a = IdempotencyKey::derived(&base2, idx2);
2311
2312        let derived11b = IdempotencyKey::derived(&base1, idx1);
2313        let derived12b = IdempotencyKey::derived(&base1, idx2);
2314        let derived21b = IdempotencyKey::derived(&base2, idx1);
2315        let derived22b = IdempotencyKey::derived(&base2, idx2);
2316
2317        let derived31 = IdempotencyKey::derived(&base3, idx1);
2318        let derived32 = IdempotencyKey::derived(&base3, idx2);
2319
2320        assert_eq!(derived11a, derived11b);
2321        assert_eq!(derived12a, derived12b);
2322        assert_eq!(derived21a, derived21b);
2323        assert_eq!(derived22a, derived22b);
2324
2325        assert_ne!(derived11a, derived12a);
2326        assert_ne!(derived11a, derived21a);
2327        assert_ne!(derived11a, derived22a);
2328        assert_ne!(derived12a, derived21a);
2329        assert_ne!(derived12a, derived22a);
2330        assert_ne!(derived21a, derived22a);
2331
2332        assert_ne!(derived11a, derived31);
2333        assert_ne!(derived21a, derived31);
2334        assert_ne!(derived12a, derived32);
2335        assert_ne!(derived22a, derived32);
2336        assert_ne!(derived31, derived32);
2337    }
2338
2339    #[test]
2340    fn initial_component_file_path_from_absolute() {
2341        let path = ComponentFilePath::from_abs_str("/a/b/c").unwrap();
2342        assert_eq!(path.to_string(), "/a/b/c");
2343    }
2344
2345    #[test]
2346    fn initial_component_file_path_from_relative() {
2347        let path = ComponentFilePath::from_abs_str("a/b/c");
2348        assert!(path.is_err());
2349    }
2350}