golem_common/model/public_oplog/
mod.rs

1// Copyright 2024-2025 Golem Cloud
2//
3// Licensed under the Golem Source License v1.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://license.golem.cloud/LICENSE
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "protobuf")]
16mod protobuf;
17
18#[cfg(test)]
19mod tests;
20
21use super::plugin::PluginDefinition;
22use crate::model::invocation_context::{AttributeValue, SpanId, TraceId};
23use crate::model::lucene::{LeafQuery, Query};
24use crate::model::oplog::{
25    DurableFunctionType, LogLevel, OplogIndex, PersistenceLevel, WorkerResourceId,
26};
27use crate::model::plugin::PluginInstallation;
28use crate::model::regions::OplogRegion;
29use crate::model::RetryConfig;
30use crate::model::{
31    AccountId, ComponentVersion, Empty, IdempotencyKey, PluginInstallationId, Timestamp, WorkerId,
32};
33use golem_wasm_ast::analysis::analysed_type::{field, list, option, record, str};
34use golem_wasm_ast::analysis::{AnalysedType, NameOptionTypePair};
35use golem_wasm_rpc::{IntoValue, IntoValueAndType, Value, ValueAndType, WitValue};
36use golem_wasm_rpc_derive::IntoValue;
37use serde::{Deserialize, Serialize};
38use std::collections::{BTreeMap, BTreeSet};
39use std::fmt;
40use std::fmt::{Display, Formatter};
41use std::time::Duration;
42
43#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
44#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
45#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
46#[serde(rename_all = "camelCase")]
47#[wit_transparent]
48pub struct SnapshotBasedUpdateParameters {
49    pub payload: Vec<u8>,
50}
51
52#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
53#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
54#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
55#[serde(tag = "type")]
56pub enum PublicUpdateDescription {
57    #[unit_case]
58    Automatic(Empty),
59    SnapshotBased(SnapshotBasedUpdateParameters),
60}
61
62#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
63#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
64#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
65#[serde(rename_all = "camelCase")]
66#[wit_transparent]
67pub struct WriteRemoteBatchedParameters {
68    pub index: Option<OplogIndex>,
69}
70
71#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
72#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
73#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
74#[serde(tag = "type")]
75pub enum PublicDurableFunctionType {
76    /// The side-effect reads from the worker's local state (for example local file system,
77    /// random generator, etc.)
78    #[unit_case]
79    ReadLocal(Empty),
80    /// The side-effect writes to the worker's local state (for example local file system)
81    #[unit_case]
82    WriteLocal(Empty),
83    /// The side-effect reads from external state (for example a key-value store)
84    #[unit_case]
85    ReadRemote(Empty),
86    /// The side-effect manipulates external state (for example an RPC call)
87    #[unit_case]
88    WriteRemote(Empty),
89    /// The side-effect manipulates external state through multiple invoked functions (for example
90    /// a HTTP request where reading the response involves multiple host function calls)
91    ///
92    /// On the first invocation of the batch, the parameter should be `None` - this triggers
93    /// writing a `BeginRemoteWrite` entry in the oplog. Followup invocations should contain
94    /// this entry's index as the parameter. In batched remote writes it is the caller's responsibility
95    /// to manually write an `EndRemoteWrite` entry (using `end_function`) when the operation is completed.
96    WriteRemoteBatched(WriteRemoteBatchedParameters),
97}
98
99impl From<DurableFunctionType> for PublicDurableFunctionType {
100    fn from(function_type: DurableFunctionType) -> Self {
101        match function_type {
102            DurableFunctionType::ReadLocal => PublicDurableFunctionType::ReadLocal(Empty {}),
103            DurableFunctionType::WriteLocal => PublicDurableFunctionType::WriteLocal(Empty {}),
104            DurableFunctionType::ReadRemote => PublicDurableFunctionType::ReadRemote(Empty {}),
105            DurableFunctionType::WriteRemote => PublicDurableFunctionType::WriteRemote(Empty {}),
106            DurableFunctionType::WriteRemoteBatched(index) => {
107                PublicDurableFunctionType::WriteRemoteBatched(WriteRemoteBatchedParameters {
108                    index,
109                })
110            }
111        }
112    }
113}
114
115#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
116#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
117#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
118#[serde(rename_all = "camelCase")]
119pub struct DetailsParameter {
120    pub details: String,
121}
122
123#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
124#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
125#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
126#[serde(rename_all = "camelCase")]
127pub struct PublicRetryConfig {
128    pub max_attempts: u32,
129    #[serde(with = "humantime_serde")]
130    pub min_delay: Duration,
131    #[serde(with = "humantime_serde")]
132    pub max_delay: Duration,
133    pub multiplier: f64,
134    pub max_jitter_factor: Option<f64>,
135}
136
137impl From<RetryConfig> for PublicRetryConfig {
138    fn from(retry_config: RetryConfig) -> Self {
139        PublicRetryConfig {
140            max_attempts: retry_config.max_attempts,
141            min_delay: retry_config.min_delay,
142            max_delay: retry_config.max_delay,
143            multiplier: retry_config.multiplier,
144            max_jitter_factor: retry_config.max_jitter_factor,
145        }
146    }
147}
148
149#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
150#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
151#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
152#[serde(rename_all = "camelCase")]
153pub struct ExportedFunctionParameters {
154    pub idempotency_key: IdempotencyKey,
155    pub full_function_name: String,
156    pub function_input: Option<Vec<ValueAndType>>,
157    pub trace_id: TraceId,
158    pub trace_states: Vec<String>,
159    pub invocation_context: Vec<Vec<PublicSpanData>>,
160}
161
162impl IntoValue for ExportedFunctionParameters {
163    fn into_value(self) -> Value {
164        let wit_values: Option<Vec<WitValue>> = self
165            .function_input
166            .map(|inputs| inputs.into_iter().map(Into::into).collect());
167        Value::Record(vec![
168            self.idempotency_key.into_value(),
169            self.full_function_name.into_value(),
170            wit_values.into_value(),
171        ])
172    }
173
174    fn get_type() -> AnalysedType {
175        record(vec![
176            field("idempotency-key", IdempotencyKey::get_type()),
177            field("function-name", str()),
178            field("input", option(list(WitValue::get_type()))),
179        ])
180    }
181}
182
183#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
184#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
185#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
186#[serde(rename_all = "camelCase")]
187#[wit_transparent]
188pub struct ManualUpdateParameters {
189    pub target_version: ComponentVersion,
190}
191
192#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
193#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
194#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
195#[serde(tag = "type")]
196pub enum PublicWorkerInvocation {
197    ExportedFunction(ExportedFunctionParameters),
198    ManualUpdate(ManualUpdateParameters),
199}
200
201#[derive(Clone, Debug, Serialize, PartialEq, Eq, PartialOrd, Ord, Deserialize, IntoValue)]
202#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
203#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
204#[serde(rename_all = "camelCase")]
205pub struct PluginInstallationDescription {
206    pub installation_id: PluginInstallationId,
207    pub plugin_name: String,
208    pub plugin_version: String,
209    pub registered: bool,
210    pub parameters: BTreeMap<String, String>,
211}
212
213impl PluginInstallationDescription {
214    pub fn from_definition_and_installation(
215        definition: PluginDefinition,
216        installation: PluginInstallation,
217    ) -> Self {
218        Self {
219            installation_id: installation.id,
220            plugin_name: definition.name,
221            plugin_version: definition.version,
222            parameters: installation.parameters.into_iter().collect(),
223            registered: !definition.deleted,
224        }
225    }
226}
227
228#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
229#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
230#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
231#[serde(rename_all = "camelCase")]
232pub struct CreateParameters {
233    pub timestamp: Timestamp,
234    pub worker_id: WorkerId,
235    pub component_version: ComponentVersion,
236    pub args: Vec<String>,
237    pub env: BTreeMap<String, String>,
238    pub account_id: AccountId,
239    pub parent: Option<WorkerId>,
240    pub component_size: u64,
241    pub initial_total_linear_memory_size: u64,
242    pub initial_active_plugins: BTreeSet<PluginInstallationDescription>,
243}
244
245#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
246#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
247#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
248#[serde(rename_all = "camelCase")]
249pub struct ImportedFunctionInvokedParameters {
250    pub timestamp: Timestamp,
251    pub function_name: String,
252    #[wit_field(convert = WitValue)]
253    pub request: ValueAndType,
254    #[wit_field(convert = WitValue)]
255    pub response: ValueAndType,
256    pub wrapped_function_type: PublicDurableFunctionType, // TODO: rename in Golem 2.0
257}
258
259#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, IntoValue)]
260#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
261#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
262#[serde(rename_all = "camelCase")]
263#[wit_transparent]
264pub struct StringAttributeValue {
265    pub value: String,
266}
267
268#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, IntoValue)]
269#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
270#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
271#[serde(tag = "type")]
272pub enum PublicAttributeValue {
273    String(StringAttributeValue),
274}
275
276impl From<AttributeValue> for PublicAttributeValue {
277    fn from(value: AttributeValue) -> Self {
278        match value {
279            AttributeValue::String(value) => {
280                PublicAttributeValue::String(StringAttributeValue { value })
281            }
282        }
283    }
284}
285
286#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
287#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
288#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
289#[serde(rename_all = "camelCase")]
290pub struct PublicLocalSpanData {
291    pub span_id: SpanId,
292    pub start: Timestamp,
293    pub parent_id: Option<SpanId>,
294    pub linked_context: Option<u64>,
295    pub attributes: Vec<PublicAttribute>,
296    pub inherited: bool,
297}
298
299#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
300#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
301#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
302#[serde(rename_all = "camelCase")]
303pub struct PublicAttribute {
304    pub key: String,
305    pub value: PublicAttributeValue,
306}
307
308#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
309#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
310#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
311#[serde(rename_all = "camelCase")]
312pub struct PublicExternalSpanData {
313    pub span_id: SpanId,
314}
315
316#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, IntoValue)]
317#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
318#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
319#[serde(tag = "type")]
320pub enum PublicSpanData {
321    LocalSpan(PublicLocalSpanData),
322    ExternalSpan(PublicExternalSpanData),
323}
324
325impl PublicSpanData {
326    pub fn span_id(&self) -> &SpanId {
327        match self {
328            PublicSpanData::LocalSpan(data) => &data.span_id,
329            PublicSpanData::ExternalSpan(data) => &data.span_id,
330        }
331    }
332}
333
334#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
335#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
336#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
337#[serde(rename_all = "camelCase")]
338pub struct ExportedFunctionInvokedParameters {
339    pub timestamp: Timestamp,
340    pub function_name: String,
341    #[wit_field(convert_vec = WitValue)]
342    pub request: Vec<ValueAndType>,
343    pub idempotency_key: IdempotencyKey,
344    pub trace_id: TraceId,
345    pub trace_states: Vec<String>,
346    pub invocation_context: Vec<Vec<PublicSpanData>>,
347}
348
349#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
350#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
351#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
352#[serde(rename_all = "camelCase")]
353pub struct ExportedFunctionCompletedParameters {
354    pub timestamp: Timestamp,
355    #[wit_field(convert_option = WitValue)]
356    pub response: Option<ValueAndType>,
357    pub consumed_fuel: i64,
358}
359
360#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
361#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
362#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
363#[serde(rename_all = "camelCase")]
364pub struct TimestampParameter {
365    pub timestamp: Timestamp,
366}
367
368#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
369#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
370#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
371#[serde(rename_all = "camelCase")]
372pub struct ErrorParameters {
373    pub timestamp: Timestamp,
374    pub error: String,
375}
376
377#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
378#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
379#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
380#[serde(rename_all = "camelCase")]
381pub struct JumpParameters {
382    pub timestamp: Timestamp,
383    pub jump: OplogRegion,
384}
385
386impl IntoValue for JumpParameters {
387    fn into_value(self) -> Value {
388        Value::Record(vec![
389            self.timestamp.into_value(),
390            self.jump.start.into_value(),
391            self.jump.end.into_value(),
392        ])
393    }
394
395    fn get_type() -> AnalysedType {
396        record(vec![
397            field("timestamp", Timestamp::get_type()),
398            field("start", OplogIndex::get_type()),
399            field("end", OplogIndex::get_type()),
400        ])
401    }
402}
403
404#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
405#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
406#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
407#[serde(rename_all = "camelCase")]
408pub struct ChangeRetryPolicyParameters {
409    pub timestamp: Timestamp,
410    pub new_policy: PublicRetryConfig,
411}
412
413#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
414#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
415#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
416#[serde(rename_all = "camelCase")]
417pub struct EndRegionParameters {
418    pub timestamp: Timestamp,
419    pub begin_index: OplogIndex,
420}
421
422#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
423#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
424pub struct PendingWorkerInvocationParameters {
425    pub timestamp: Timestamp,
426    pub invocation: PublicWorkerInvocation,
427}
428
429#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
430#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
431pub struct PendingUpdateParameters {
432    pub timestamp: Timestamp,
433    pub target_version: ComponentVersion,
434    pub description: PublicUpdateDescription,
435}
436
437#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
438#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
439pub struct SuccessfulUpdateParameters {
440    pub timestamp: Timestamp,
441    pub target_version: ComponentVersion,
442    pub new_component_size: u64,
443    pub new_active_plugins: BTreeSet<PluginInstallationDescription>,
444}
445
446#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
447#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
448#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
449#[serde(rename_all = "camelCase")]
450pub struct FailedUpdateParameters {
451    pub timestamp: Timestamp,
452    pub target_version: ComponentVersion,
453    pub details: Option<String>,
454}
455
456#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
457#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
458#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
459#[serde(rename_all = "camelCase")]
460pub struct GrowMemoryParameters {
461    pub timestamp: Timestamp,
462    pub delta: u64,
463}
464
465#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
466#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
467#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
468#[serde(rename_all = "camelCase")]
469pub struct ResourceParameters {
470    pub timestamp: Timestamp,
471    pub id: WorkerResourceId,
472}
473
474#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
475#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
476#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
477#[serde(rename_all = "camelCase")]
478pub struct DescribeResourceParameters {
479    pub timestamp: Timestamp,
480    pub id: WorkerResourceId,
481    pub resource_name: String,
482    #[wit_field(convert_vec = WitValue)]
483    pub resource_params: Vec<ValueAndType>,
484}
485
486#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
487#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
488#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
489#[serde(rename_all = "camelCase")]
490pub struct LogParameters {
491    pub timestamp: Timestamp,
492    pub level: LogLevel,
493    pub context: String,
494    pub message: String,
495}
496
497#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
498#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
499#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
500#[serde(rename_all = "camelCase")]
501pub struct ActivatePluginParameters {
502    pub timestamp: Timestamp,
503    pub plugin: PluginInstallationDescription,
504}
505
506#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
507#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
508#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
509#[serde(rename_all = "camelCase")]
510pub struct DeactivatePluginParameters {
511    pub timestamp: Timestamp,
512    pub plugin: PluginInstallationDescription,
513}
514
515#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
516#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
517#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
518#[serde(rename_all = "camelCase")]
519pub struct RevertParameters {
520    pub timestamp: Timestamp,
521    pub dropped_region: OplogRegion,
522}
523
524impl IntoValue for RevertParameters {
525    fn into_value(self) -> Value {
526        Value::Record(vec![
527            self.timestamp.into_value(),
528            self.dropped_region.start.into_value(),
529            self.dropped_region.end.into_value(),
530        ])
531    }
532
533    fn get_type() -> AnalysedType {
534        record(vec![
535            field("timestamp", Timestamp::get_type()),
536            field("start", OplogIndex::get_type()),
537            field("end", OplogIndex::get_type()),
538        ])
539    }
540}
541
542#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
543#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
544#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
545#[serde(rename_all = "camelCase")]
546pub struct CancelInvocationParameters {
547    pub timestamp: Timestamp,
548    pub idempotency_key: IdempotencyKey,
549}
550
551#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
552#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
553#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
554#[serde(rename_all = "camelCase")]
555pub struct StartSpanParameters {
556    pub timestamp: Timestamp,
557    pub span_id: SpanId,
558    pub parent_id: Option<SpanId>,
559    pub linked_context: Option<SpanId>,
560    pub attributes: Vec<PublicAttribute>,
561}
562
563#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
564#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
565#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
566#[serde(rename_all = "camelCase")]
567pub struct FinishSpanParameters {
568    pub timestamp: Timestamp,
569    pub span_id: SpanId,
570}
571
572#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
573#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
574#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
575#[serde(rename_all = "camelCase")]
576pub struct SetSpanAttributeParameters {
577    pub timestamp: Timestamp,
578    pub span_id: SpanId,
579    pub key: String,
580    pub value: PublicAttributeValue,
581}
582
583#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, IntoValue)]
584#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
585#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
586#[serde(rename_all = "camelCase")]
587pub struct ChangePersistenceLevelParameters {
588    pub timestamp: Timestamp,
589    pub persistence_level: PersistenceLevel,
590}
591
592/// A mirror of the core `OplogEntry` type, without the undefined arbitrary payloads.
593///
594/// Instead, it encodes all payloads with wasm-rpc `Value` types. This makes this the base type
595/// for exposing oplog entries through various APIs such as gRPC, REST and WIT.
596///
597/// The rest of the system will always use `OplogEntry` internally - the only point where the
598/// oplog payloads are decoded and re-encoded as `Value` is in this module, and it should only be used
599/// before exposing an oplog entry through a public API.
600#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
601#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
602#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
603#[serde(tag = "type")]
604pub enum PublicOplogEntry {
605    Create(CreateParameters),
606    /// The worker invoked a host function
607    ImportedFunctionInvoked(ImportedFunctionInvokedParameters),
608    /// The worker has been invoked
609    ExportedFunctionInvoked(ExportedFunctionInvokedParameters),
610    /// The worker has completed an invocation
611    ExportedFunctionCompleted(ExportedFunctionCompletedParameters),
612    /// Worker suspended
613    Suspend(TimestampParameter),
614    /// Worker failed
615    Error(ErrorParameters),
616    /// Marker entry added when get-oplog-index is called from the worker, to make the jumping behavior
617    /// more predictable.
618    NoOp(TimestampParameter),
619    /// The worker needs to recover up to the given target oplog index and continue running from
620    /// the source oplog index from there
621    /// `jump` is an oplog region representing that from the end of that region we want to go back to the start and
622    /// ignore all recorded operations in between.
623    Jump(JumpParameters),
624    /// Indicates that the worker has been interrupted at this point.
625    /// Only used to recompute the worker's (cached) status, has no effect on execution.
626    Interrupted(TimestampParameter),
627    /// Indicates that the worker has been exited using WASI's exit function.
628    Exited(TimestampParameter),
629    /// Overrides the worker's retry policy
630    ChangeRetryPolicy(ChangeRetryPolicyParameters),
631    /// Begins an atomic region. All oplog entries after `BeginAtomicRegion` are to be ignored during
632    /// recovery except if there is a corresponding `EndAtomicRegion` entry.
633    BeginAtomicRegion(TimestampParameter),
634    /// Ends an atomic region. All oplog entries between the corresponding `BeginAtomicRegion` and this
635    /// entry are to be considered during recovery, and the begin/end markers can be removed during oplog
636    /// compaction.
637    EndAtomicRegion(EndRegionParameters),
638    /// Begins a remote write operation. Only used when idempotence mode is off. In this case each
639    /// remote write must be surrounded by a `BeginRemoteWrite` and `EndRemoteWrite` log pair and
640    /// unfinished remote writes cannot be recovered.
641    BeginRemoteWrite(TimestampParameter),
642    /// Marks the end of a remote write operation. Only used when idempotence mode is off.
643    EndRemoteWrite(EndRegionParameters),
644    /// An invocation request arrived while the worker was busy
645    PendingWorkerInvocation(PendingWorkerInvocationParameters),
646    /// An update request arrived and will be applied as soon the worker restarts
647    PendingUpdate(PendingUpdateParameters),
648    /// An update was successfully applied
649    SuccessfulUpdate(SuccessfulUpdateParameters),
650    /// An update failed to be applied
651    FailedUpdate(FailedUpdateParameters),
652    /// Increased total linear memory size
653    GrowMemory(GrowMemoryParameters),
654    /// Created a resource instance
655    CreateResource(ResourceParameters),
656    /// Dropped a resource instance
657    DropResource(ResourceParameters),
658    /// Adds additional information for a created resource instance
659    DescribeResource(DescribeResourceParameters),
660    /// The worker emitted a log message
661    Log(LogParameters),
662    /// Marks the point where the worker was restarted from clean initial state
663    Restart(TimestampParameter),
664    /// Activates a plugin
665    ActivatePlugin(ActivatePluginParameters),
666    /// Deactivates a plugin
667    DeactivatePlugin(DeactivatePluginParameters),
668    /// Revert a worker to a previous state
669    Revert(RevertParameters),
670    /// Cancel a pending invocation
671    CancelInvocation(CancelInvocationParameters),
672    /// Start a new span in the invocation context
673    StartSpan(StartSpanParameters),
674    /// Finish an open span in the invocation context
675    FinishSpan(FinishSpanParameters),
676    /// Set an attribute on an open span in the invocation context
677    SetSpanAttribute(SetSpanAttributeParameters),
678    /// Change the current persistence level
679    ChangePersistenceLevel(ChangePersistenceLevelParameters),
680}
681
682impl PublicOplogEntry {
683    pub fn matches(&self, query: &Query) -> bool {
684        fn matches_impl(entry: &PublicOplogEntry, query: &Query, field_stack: &[String]) -> bool {
685            match query {
686                Query::Or { queries } => queries
687                    .iter()
688                    .any(|query| matches_impl(entry, query, field_stack)),
689                Query::And { queries } => queries
690                    .iter()
691                    .all(|query| matches_impl(entry, query, field_stack)),
692                Query::Not { query } => !matches_impl(entry, query, field_stack),
693                Query::Regex { .. } => {
694                    entry.matches_leaf_query(field_stack, &query.clone().try_into().unwrap())
695                }
696                Query::Term { .. } => {
697                    entry.matches_leaf_query(field_stack, &query.clone().try_into().unwrap())
698                }
699                Query::Phrase { .. } => {
700                    entry.matches_leaf_query(field_stack, &query.clone().try_into().unwrap())
701                }
702                Query::Field { field, query } => {
703                    let mut new_stack: Vec<String> = field_stack.to_vec();
704                    let parts: Vec<String> = field.split(".").map(|s| s.to_string()).collect();
705                    new_stack.extend(parts);
706                    matches_impl(entry, query, &new_stack)
707                }
708            }
709        }
710
711        matches_impl(self, query, &[])
712    }
713
714    fn string_match(s: &str, path: &[String], query_path: &[String], query: &LeafQuery) -> bool {
715        let lowercase_path = path
716            .iter()
717            .map(|s| s.to_lowercase())
718            .collect::<Vec<String>>();
719        let lowercase_query_path = query_path
720            .iter()
721            .map(|s| s.to_lowercase())
722            .collect::<Vec<String>>();
723        if lowercase_path == lowercase_query_path || query_path.is_empty() {
724            query.matches(s)
725        } else {
726            false
727        }
728    }
729
730    fn span_attribute_match(
731        attributes: &Vec<PublicAttribute>,
732        path_stack: &[String],
733        query_path: &[String],
734        query: &LeafQuery,
735    ) -> bool {
736        for attr in attributes {
737            let key = &attr.key;
738            let value = &attr.value;
739            let mut new_path: Vec<String> = path_stack.to_vec();
740            new_path.push(key.clone());
741
742            let vnt = match value {
743                PublicAttributeValue::String(StringAttributeValue { value }) => {
744                    value.clone().into_value_and_type()
745                }
746            };
747
748            if Self::match_value(&vnt, &new_path, query_path, query) {
749                return true;
750            }
751        }
752        false
753    }
754
755    fn matches_leaf_query(&self, query_path: &[String], query: &LeafQuery) -> bool {
756        match self {
757            PublicOplogEntry::Create(_params) => {
758                Self::string_match("create", &[], query_path, query)
759            }
760            PublicOplogEntry::ImportedFunctionInvoked(params) => {
761                Self::string_match("importedfunctioninvoked", &[], query_path, query)
762                    || Self::string_match("imported-function-invoked", &[], query_path, query)
763                    || Self::string_match("imported-function", &[], query_path, query)
764                    || Self::string_match(&params.function_name, &[], query_path, query)
765                    || Self::match_value(&params.request, &[], query_path, query)
766                    || Self::match_value(&params.response, &[], query_path, query)
767            }
768            PublicOplogEntry::ExportedFunctionInvoked(params) => {
769                Self::string_match("exportedfunctioninvoked", &[], query_path, query)
770                    || Self::string_match("exported-function-invoked", &[], query_path, query)
771                    || Self::string_match("exported-function", &[], query_path, query)
772                    || Self::string_match(&params.function_name, &[], query_path, query)
773                    || params
774                        .request
775                        .iter()
776                        .any(|v| Self::match_value(v, &[], query_path, query))
777                    || Self::string_match(&params.idempotency_key.value, &[], query_path, query)
778            }
779            PublicOplogEntry::ExportedFunctionCompleted(params) => {
780                Self::string_match("exportedfunctioncompleted", &[], query_path, query)
781                    || Self::string_match("exported-function-completed", &[], query_path, query)
782                    || Self::string_match("exported-function", &[], query_path, query)
783                    || match &params.response {
784                        Some(response) => Self::match_value(response, &[], query_path, query),
785                        None => false,
786                    }
787                // TODO: should we store function name and idempotency key in ExportedFunctionCompleted?
788            }
789            PublicOplogEntry::Suspend(_params) => {
790                Self::string_match("suspend", &[], query_path, query)
791            }
792            PublicOplogEntry::Error(params) => {
793                Self::string_match("error", &[], query_path, query)
794                    || Self::string_match(&params.error, &[], query_path, query)
795            }
796            PublicOplogEntry::NoOp(_params) => Self::string_match("noop", &[], query_path, query),
797            PublicOplogEntry::Jump(_params) => Self::string_match("jump", &[], query_path, query),
798            PublicOplogEntry::Interrupted(_params) => {
799                Self::string_match("interrupted", &[], query_path, query)
800            }
801            PublicOplogEntry::Exited(_params) => {
802                Self::string_match("exited", &[], query_path, query)
803            }
804            PublicOplogEntry::ChangeRetryPolicy(_params) => {
805                Self::string_match("changeretrypolicy", &[], query_path, query)
806                    || Self::string_match("change-retry-policy", &[], query_path, query)
807            }
808            PublicOplogEntry::BeginAtomicRegion(_params) => {
809                Self::string_match("beginatomicregion", &[], query_path, query)
810                    || Self::string_match("begin-atomic-region", &[], query_path, query)
811            }
812            PublicOplogEntry::EndAtomicRegion(_params) => {
813                Self::string_match("endatomicregion", &[], query_path, query)
814                    || Self::string_match("end-atomic-region", &[], query_path, query)
815            }
816            PublicOplogEntry::BeginRemoteWrite(_params) => {
817                Self::string_match("beginremotewrite", &[], query_path, query)
818                    || Self::string_match("begin-remote-write", &[], query_path, query)
819            }
820            PublicOplogEntry::EndRemoteWrite(_params) => {
821                Self::string_match("endremotewrite", &[], query_path, query)
822                    || Self::string_match("end-remote-write", &[], query_path, query)
823            }
824            PublicOplogEntry::PendingWorkerInvocation(params) => {
825                Self::string_match("pendingworkerinvocation", &[], query_path, query)
826                    || Self::string_match("pending-worker-invocation", &[], query_path, query)
827                    || match &params.invocation {
828                        PublicWorkerInvocation::ExportedFunction(params) => {
829                            Self::string_match(&params.full_function_name, &[], query_path, query)
830                                || Self::string_match(
831                                    &params.idempotency_key.value,
832                                    &[],
833                                    query_path,
834                                    query,
835                                )
836                                || params
837                                    .function_input
838                                    .as_ref()
839                                    .map(|params| {
840                                        params
841                                            .iter()
842                                            .any(|v| Self::match_value(v, &[], query_path, query))
843                                    })
844                                    .unwrap_or(false)
845                        }
846                        PublicWorkerInvocation::ManualUpdate(params) => Self::string_match(
847                            &params.target_version.to_string(),
848                            &[],
849                            query_path,
850                            query,
851                        ),
852                    }
853            }
854            PublicOplogEntry::PendingUpdate(params) => {
855                Self::string_match("pendingupdate", &[], query_path, query)
856                    || Self::string_match("pending-update", &[], query_path, query)
857                    || Self::string_match("update", &[], query_path, query)
858                    || Self::string_match(
859                        &params.target_version.to_string(),
860                        &[],
861                        query_path,
862                        query,
863                    )
864            }
865            PublicOplogEntry::SuccessfulUpdate(params) => {
866                Self::string_match("successfulupdate", &[], query_path, query)
867                    || Self::string_match("successful-update", &[], query_path, query)
868                    || Self::string_match("update", &[], query_path, query)
869                    || Self::string_match(
870                        &params.target_version.to_string(),
871                        &[],
872                        query_path,
873                        query,
874                    )
875            }
876            PublicOplogEntry::FailedUpdate(params) => {
877                Self::string_match("failedupdate", &[], query_path, query)
878                    || Self::string_match("failed-update", &[], query_path, query)
879                    || Self::string_match("update", &[], query_path, query)
880                    || Self::string_match(
881                        &params.target_version.to_string(),
882                        &[],
883                        query_path,
884                        query,
885                    )
886                    || params
887                        .details
888                        .as_ref()
889                        .map(|details| Self::string_match(details, &[], query_path, query))
890                        .unwrap_or(false)
891            }
892            PublicOplogEntry::GrowMemory(_params) => {
893                Self::string_match("growmemory", &[], query_path, query)
894                    || Self::string_match("grow-memory", &[], query_path, query)
895            }
896            PublicOplogEntry::CreateResource(_params) => {
897                Self::string_match("createresource", &[], query_path, query)
898                    || Self::string_match("create-resource", &[], query_path, query)
899            }
900            PublicOplogEntry::DropResource(_params) => {
901                Self::string_match("dropresource", &[], query_path, query)
902                    || Self::string_match("drop-resource", &[], query_path, query)
903            }
904            PublicOplogEntry::DescribeResource(params) => {
905                Self::string_match("describeresource", &[], query_path, query)
906                    || Self::string_match("describe-resource", &[], query_path, query)
907                    || Self::string_match(&params.resource_name, &[], query_path, query)
908                    || params
909                        .resource_params
910                        .iter()
911                        .any(|v| Self::match_value(v, &[], query_path, query))
912            }
913            PublicOplogEntry::Log(params) => {
914                Self::string_match("log", &[], query_path, query)
915                    || Self::string_match(&params.context, &[], query_path, query)
916                    || Self::string_match(&params.message, &[], query_path, query)
917            }
918            PublicOplogEntry::Restart(_params) => {
919                Self::string_match("restart", &[], query_path, query)
920            }
921            PublicOplogEntry::ActivatePlugin(_params) => {
922                Self::string_match("activateplugin", &[], query_path, query)
923                    || Self::string_match("activate-plugin", &[], query_path, query)
924            }
925            PublicOplogEntry::DeactivatePlugin(_params) => {
926                Self::string_match("deactivateplugin", &[], query_path, query)
927                    || Self::string_match("deactivate-plugin", &[], query_path, query)
928            }
929            PublicOplogEntry::Revert(_params) => {
930                Self::string_match("revert", &[], query_path, query)
931            }
932            PublicOplogEntry::CancelInvocation(params) => {
933                Self::string_match("cancel", &[], query_path, query)
934                    || Self::string_match("cancel-invocation", &[], query_path, query)
935                    || Self::string_match(&params.idempotency_key.value, &[], query_path, query)
936            }
937            PublicOplogEntry::StartSpan(params) => {
938                Self::string_match("startspan", &[], query_path, query)
939                    || Self::string_match("start-span", &[], query_path, query)
940                    || Self::string_match(&params.span_id.to_string(), &[], query_path, query)
941                    || Self::string_match(
942                        &params
943                            .parent_id
944                            .as_ref()
945                            .map(|id| id.to_string())
946                            .unwrap_or_default(),
947                        &[],
948                        query_path,
949                        query,
950                    )
951                    || Self::string_match(
952                        &params
953                            .linked_context
954                            .as_ref()
955                            .map(|id| id.to_string())
956                            .unwrap_or_default(),
957                        &[],
958                        query_path,
959                        query,
960                    )
961                    || Self::span_attribute_match(&params.attributes, &[], query_path, query)
962            }
963            PublicOplogEntry::FinishSpan(params) => {
964                Self::string_match("finishspan", &[], query_path, query)
965                    || Self::string_match("finish-span", &[], query_path, query)
966                    || Self::string_match(&params.span_id.to_string(), &[], query_path, query)
967            }
968            PublicOplogEntry::SetSpanAttribute(params) => {
969                let attributes = vec![PublicAttribute {
970                    key: params.key.clone(),
971                    value: params.value.clone(),
972                }];
973                Self::string_match("setspanattribute", &[], query_path, query)
974                    || Self::string_match("set-span-attribute", &[], query_path, query)
975                    || Self::string_match(&params.key, &[], query_path, query)
976                    || Self::span_attribute_match(&attributes, &[], query_path, query)
977            }
978            PublicOplogEntry::ChangePersistenceLevel(_params) => {
979                Self::string_match("changepersistencelevel", &[], query_path, query)
980                    || Self::string_match("change-persistence-level", &[], query_path, query)
981                    || Self::string_match("persistence-level", &[], query_path, query)
982            }
983        }
984    }
985
986    fn match_value(
987        value: &ValueAndType,
988        path_stack: &[String],
989        query_path: &[String],
990        query: &LeafQuery,
991    ) -> bool {
992        match (&value.value, &value.typ) {
993            (Value::Bool(value), _) => {
994                Self::string_match(&value.to_string(), path_stack, query_path, query)
995            }
996            (Value::U8(value), _) => {
997                Self::string_match(&value.to_string(), path_stack, query_path, query)
998            }
999            (Value::U16(value), _) => {
1000                Self::string_match(&value.to_string(), path_stack, query_path, query)
1001            }
1002            (Value::U32(value), _) => {
1003                Self::string_match(&value.to_string(), path_stack, query_path, query)
1004            }
1005            (Value::U64(value), _) => {
1006                Self::string_match(&value.to_string(), path_stack, query_path, query)
1007            }
1008            (Value::S8(value), _) => {
1009                Self::string_match(&value.to_string(), path_stack, query_path, query)
1010            }
1011            (Value::S16(value), _) => {
1012                Self::string_match(&value.to_string(), path_stack, query_path, query)
1013            }
1014            (Value::S32(value), _) => {
1015                Self::string_match(&value.to_string(), path_stack, query_path, query)
1016            }
1017            (Value::S64(value), _) => {
1018                Self::string_match(&value.to_string(), path_stack, query_path, query)
1019            }
1020            (Value::F32(value), _) => {
1021                Self::string_match(&value.to_string(), path_stack, query_path, query)
1022            }
1023            (Value::F64(value), _) => {
1024                Self::string_match(&value.to_string(), path_stack, query_path, query)
1025            }
1026            (Value::Char(value), _) => {
1027                Self::string_match(&value.to_string(), path_stack, query_path, query)
1028            }
1029            (Value::String(value), _) => {
1030                Self::string_match(&value.to_string(), path_stack, query_path, query)
1031            }
1032            (Value::List(elems), AnalysedType::List(list)) => elems.iter().any(|v| {
1033                Self::match_value(
1034                    &ValueAndType::new(v.clone(), (*list.inner).clone()),
1035                    path_stack,
1036                    query_path,
1037                    query,
1038                )
1039            }),
1040            (Value::Tuple(elems), AnalysedType::Tuple(tuple)) => {
1041                if elems.len() != tuple.items.len() {
1042                    false
1043                } else {
1044                    elems
1045                        .iter()
1046                        .zip(tuple.items.iter())
1047                        .enumerate()
1048                        .any(|(idx, (v, t))| {
1049                            let mut new_path: Vec<String> = path_stack.to_vec();
1050                            new_path.push(idx.to_string());
1051                            Self::match_value(
1052                                &ValueAndType::new(v.clone(), t.clone()),
1053                                &new_path,
1054                                query_path,
1055                                query,
1056                            )
1057                        })
1058                }
1059            }
1060            (Value::Record(fields), AnalysedType::Record(record)) => {
1061                if fields.len() != record.fields.len() {
1062                    false
1063                } else {
1064                    fields.iter().zip(record.fields.iter()).any(|(v, t)| {
1065                        let mut new_path: Vec<String> = path_stack.to_vec();
1066                        new_path.push(t.name.clone());
1067                        Self::match_value(
1068                            &ValueAndType::new(v.clone(), t.typ.clone()),
1069                            &new_path,
1070                            path_stack,
1071                            query,
1072                        )
1073                    })
1074                }
1075            }
1076            (
1077                Value::Variant {
1078                    case_value,
1079                    case_idx,
1080                },
1081                AnalysedType::Variant(variant),
1082            ) => {
1083                let case = variant.cases.get(*case_idx as usize);
1084                match (case_value, case) {
1085                    (
1086                        Some(value),
1087                        Some(NameOptionTypePair {
1088                            typ: Some(typ),
1089                            name,
1090                        }),
1091                    ) => {
1092                        let mut new_path: Vec<String> = path_stack.to_vec();
1093                        new_path.push(name.clone());
1094                        Self::match_value(
1095                            &ValueAndType::new((**value).clone(), typ.clone()),
1096                            &new_path,
1097                            query_path,
1098                            query,
1099                        )
1100                    }
1101                    _ => false,
1102                }
1103            }
1104            (Value::Enum(value), AnalysedType::Enum(typ)) => {
1105                if let Some(case) = typ.cases.get(*value as usize) {
1106                    Self::string_match(case, path_stack, query_path, query)
1107                } else {
1108                    false
1109                }
1110            }
1111            (Value::Flags(bitmap), AnalysedType::Flags(flags)) => {
1112                let names = bitmap
1113                    .iter()
1114                    .enumerate()
1115                    .filter_map(|(idx, set)| if *set { flags.names.get(idx) } else { None })
1116                    .collect::<Vec<_>>();
1117                names
1118                    .iter()
1119                    .any(|name| Self::string_match(name, path_stack, query_path, query))
1120            }
1121            (Value::Option(Some(value)), AnalysedType::Option(typ)) => Self::match_value(
1122                &ValueAndType::new((**value).clone(), (*typ.inner).clone()),
1123                path_stack,
1124                query_path,
1125                query,
1126            ),
1127            (Value::Result(value), AnalysedType::Result(typ)) => match value {
1128                Ok(Some(value)) if typ.ok.is_some() => {
1129                    let mut new_path = path_stack.to_vec();
1130                    new_path.push("ok".to_string());
1131                    Self::match_value(
1132                        &ValueAndType::new(
1133                            (**value).clone(),
1134                            (**(typ.ok.as_ref().unwrap())).clone(),
1135                        ),
1136                        &new_path,
1137                        query_path,
1138                        query,
1139                    )
1140                }
1141                Err(Some(value)) if typ.err.is_some() => {
1142                    let mut new_path = path_stack.to_vec();
1143                    new_path.push("err".to_string());
1144                    Self::match_value(
1145                        &ValueAndType::new(
1146                            (**value).clone(),
1147                            (**(typ.err.as_ref().unwrap())).clone(),
1148                        ),
1149                        &new_path,
1150                        query_path,
1151                        query,
1152                    )
1153                }
1154                _ => false,
1155            },
1156            (Value::Handle { .. }, _) => false,
1157            _ => false,
1158        }
1159    }
1160}
1161
1162#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1163#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1164#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1165#[serde(rename_all = "camelCase")]
1166pub struct OplogCursor {
1167    pub next_oplog_index: u64,
1168    pub current_component_version: u64,
1169}
1170
1171#[cfg(feature = "poem")]
1172impl poem_openapi::types::ParseFromParameter for OplogCursor {
1173    fn parse_from_parameter(value: &str) -> poem_openapi::types::ParseResult<Self> {
1174        let parts: Vec<&str> = value.split('-').collect();
1175        if parts.len() != 2 {
1176            return Err("Invalid oplog cursor".into());
1177        }
1178        let next_oplog_index = parts[0]
1179            .parse()
1180            .map_err(|_| "Invalid index in the oplog cursor")?;
1181        let current_component_version = parts[1]
1182            .parse()
1183            .map_err(|_| "Invalid component version in the oplog cursor")?;
1184        Ok(OplogCursor {
1185            next_oplog_index,
1186            current_component_version,
1187        })
1188    }
1189}
1190
1191impl Display for OplogCursor {
1192    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1193        write!(
1194            f,
1195            "{}-{}",
1196            self.next_oplog_index, self.current_component_version
1197        )
1198    }
1199}