1#[cfg(feature = "protobuf")]
16mod protobuf;
17
18#[cfg(test)]
19mod tests;
20
21use super::plugin::PluginDefinition;
22use crate::model::invocation_context::{AttributeValue, SpanId, TraceId};
23use crate::model::lucene::{LeafQuery, Query};
24use crate::model::oplog::{
25 DurableFunctionType, LogLevel, OplogIndex, PersistenceLevel, WorkerResourceId,
26};
27use crate::model::plugin::PluginInstallation;
28use crate::model::regions::OplogRegion;
29use crate::model::RetryConfig;
30use crate::model::{
31 AccountId, ComponentVersion, Empty, IdempotencyKey, PluginInstallationId, Timestamp, WorkerId,
32};
33use golem_wasm_ast::analysis::analysed_type::{field, list, option, record, str};
34use golem_wasm_ast::analysis::{AnalysedType, NameOptionTypePair};
35use golem_wasm_rpc::{IntoValue, IntoValueAndType, Value, ValueAndType, WitValue};
36use golem_wasm_rpc_derive::IntoValue;
37use serde::{Deserialize, Serialize};
38use std::collections::{BTreeMap, BTreeSet};
39use std::fmt;
40use std::fmt::{Display, Formatter};
41use std::time::Duration;
42
43#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
44#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
45#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
46#[serde(rename_all = "camelCase")]
47#[wit_transparent]
48pub struct SnapshotBasedUpdateParameters {
49 pub payload: Vec<u8>,
50}
51
52#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
53#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
54#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
55#[serde(tag = "type")]
56pub enum PublicUpdateDescription {
57 #[unit_case]
58 Automatic(Empty),
59 SnapshotBased(SnapshotBasedUpdateParameters),
60}
61
62#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
63#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
64#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
65#[serde(rename_all = "camelCase")]
66#[wit_transparent]
67pub struct WriteRemoteBatchedParameters {
68 pub index: Option<OplogIndex>,
69}
70
71#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
72#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
73#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
74#[serde(tag = "type")]
75pub enum PublicDurableFunctionType {
76 #[unit_case]
79 ReadLocal(Empty),
80 #[unit_case]
82 WriteLocal(Empty),
83 #[unit_case]
85 ReadRemote(Empty),
86 #[unit_case]
88 WriteRemote(Empty),
89 WriteRemoteBatched(WriteRemoteBatchedParameters),
97}
98
99impl From<DurableFunctionType> for PublicDurableFunctionType {
100 fn from(function_type: DurableFunctionType) -> Self {
101 match function_type {
102 DurableFunctionType::ReadLocal => PublicDurableFunctionType::ReadLocal(Empty {}),
103 DurableFunctionType::WriteLocal => PublicDurableFunctionType::WriteLocal(Empty {}),
104 DurableFunctionType::ReadRemote => PublicDurableFunctionType::ReadRemote(Empty {}),
105 DurableFunctionType::WriteRemote => PublicDurableFunctionType::WriteRemote(Empty {}),
106 DurableFunctionType::WriteRemoteBatched(index) => {
107 PublicDurableFunctionType::WriteRemoteBatched(WriteRemoteBatchedParameters {
108 index,
109 })
110 }
111 }
112 }
113}
114
115#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
116#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
117#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
118#[serde(rename_all = "camelCase")]
119pub struct DetailsParameter {
120 pub details: String,
121}
122
123#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
124#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
125#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
126#[serde(rename_all = "camelCase")]
127pub struct PublicRetryConfig {
128 pub max_attempts: u32,
129 #[serde(with = "humantime_serde")]
130 pub min_delay: Duration,
131 #[serde(with = "humantime_serde")]
132 pub max_delay: Duration,
133 pub multiplier: f64,
134 pub max_jitter_factor: Option<f64>,
135}
136
137impl From<RetryConfig> for PublicRetryConfig {
138 fn from(retry_config: RetryConfig) -> Self {
139 PublicRetryConfig {
140 max_attempts: retry_config.max_attempts,
141 min_delay: retry_config.min_delay,
142 max_delay: retry_config.max_delay,
143 multiplier: retry_config.multiplier,
144 max_jitter_factor: retry_config.max_jitter_factor,
145 }
146 }
147}
148
149#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
150#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
151#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
152#[serde(rename_all = "camelCase")]
153pub struct ExportedFunctionParameters {
154 pub idempotency_key: IdempotencyKey,
155 pub full_function_name: String,
156 pub function_input: Option<Vec<ValueAndType>>,
157 pub trace_id: TraceId,
158 pub trace_states: Vec<String>,
159 pub invocation_context: Vec<Vec<PublicSpanData>>,
160}
161
162impl IntoValue for ExportedFunctionParameters {
163 fn into_value(self) -> Value {
164 let wit_values: Option<Vec<WitValue>> = self
165 .function_input
166 .map(|inputs| inputs.into_iter().map(Into::into).collect());
167 Value::Record(vec![
168 self.idempotency_key.into_value(),
169 self.full_function_name.into_value(),
170 wit_values.into_value(),
171 ])
172 }
173
174 fn get_type() -> AnalysedType {
175 record(vec![
176 field("idempotency-key", IdempotencyKey::get_type()),
177 field("function-name", str()),
178 field("input", option(list(WitValue::get_type()))),
179 ])
180 }
181}
182
183#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
184#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
185#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
186#[serde(rename_all = "camelCase")]
187#[wit_transparent]
188pub struct ManualUpdateParameters {
189 pub target_version: ComponentVersion,
190}
191
192#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
193#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
194#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
195#[serde(tag = "type")]
196pub enum PublicWorkerInvocation {
197 ExportedFunction(ExportedFunctionParameters),
198 ManualUpdate(ManualUpdateParameters),
199}
200
201#[derive(Clone, Debug, Serialize, PartialEq, Eq, PartialOrd, Ord, Deserialize, IntoValue)]
202#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
203#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
204#[serde(rename_all = "camelCase")]
205pub struct PluginInstallationDescription {
206 pub installation_id: PluginInstallationId,
207 pub plugin_name: String,
208 pub plugin_version: String,
209 pub registered: bool,
210 pub parameters: BTreeMap<String, String>,
211}
212
213impl PluginInstallationDescription {
214 pub fn from_definition_and_installation(
215 definition: PluginDefinition,
216 installation: PluginInstallation,
217 ) -> Self {
218 Self {
219 installation_id: installation.id,
220 plugin_name: definition.name,
221 plugin_version: definition.version,
222 parameters: installation.parameters.into_iter().collect(),
223 registered: !definition.deleted,
224 }
225 }
226}
227
228#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
229#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
230#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
231#[serde(rename_all = "camelCase")]
232pub struct CreateParameters {
233 pub timestamp: Timestamp,
234 pub worker_id: WorkerId,
235 pub component_version: ComponentVersion,
236 pub args: Vec<String>,
237 pub env: BTreeMap<String, String>,
238 pub account_id: AccountId,
239 pub parent: Option<WorkerId>,
240 pub component_size: u64,
241 pub initial_total_linear_memory_size: u64,
242 pub initial_active_plugins: BTreeSet<PluginInstallationDescription>,
243}
244
245#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
246#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
247#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
248#[serde(rename_all = "camelCase")]
249pub struct ImportedFunctionInvokedParameters {
250 pub timestamp: Timestamp,
251 pub function_name: String,
252 #[wit_field(convert = WitValue)]
253 pub request: ValueAndType,
254 #[wit_field(convert = WitValue)]
255 pub response: ValueAndType,
256 pub wrapped_function_type: PublicDurableFunctionType, }
258
259#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, IntoValue)]
260#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
261#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
262#[serde(rename_all = "camelCase")]
263#[wit_transparent]
264pub struct StringAttributeValue {
265 pub value: String,
266}
267
268#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, IntoValue)]
269#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
270#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
271#[serde(tag = "type")]
272pub enum PublicAttributeValue {
273 String(StringAttributeValue),
274}
275
276impl From<AttributeValue> for PublicAttributeValue {
277 fn from(value: AttributeValue) -> Self {
278 match value {
279 AttributeValue::String(value) => {
280 PublicAttributeValue::String(StringAttributeValue { value })
281 }
282 }
283 }
284}
285
286#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
287#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
288#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
289#[serde(rename_all = "camelCase")]
290pub struct PublicLocalSpanData {
291 pub span_id: SpanId,
292 pub start: Timestamp,
293 pub parent_id: Option<SpanId>,
294 pub linked_context: Option<u64>,
295 pub attributes: Vec<PublicAttribute>,
296 pub inherited: bool,
297}
298
299#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
300#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
301#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
302#[serde(rename_all = "camelCase")]
303pub struct PublicAttribute {
304 pub key: String,
305 pub value: PublicAttributeValue,
306}
307
308#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
309#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
310#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
311#[serde(rename_all = "camelCase")]
312pub struct PublicExternalSpanData {
313 pub span_id: SpanId,
314}
315
316#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, IntoValue)]
317#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
318#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
319#[serde(tag = "type")]
320pub enum PublicSpanData {
321 LocalSpan(PublicLocalSpanData),
322 ExternalSpan(PublicExternalSpanData),
323}
324
325impl PublicSpanData {
326 pub fn span_id(&self) -> &SpanId {
327 match self {
328 PublicSpanData::LocalSpan(data) => &data.span_id,
329 PublicSpanData::ExternalSpan(data) => &data.span_id,
330 }
331 }
332}
333
334#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
335#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
336#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
337#[serde(rename_all = "camelCase")]
338pub struct ExportedFunctionInvokedParameters {
339 pub timestamp: Timestamp,
340 pub function_name: String,
341 #[wit_field(convert_vec = WitValue)]
342 pub request: Vec<ValueAndType>,
343 pub idempotency_key: IdempotencyKey,
344 pub trace_id: TraceId,
345 pub trace_states: Vec<String>,
346 pub invocation_context: Vec<Vec<PublicSpanData>>,
347}
348
349#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
350#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
351#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
352#[serde(rename_all = "camelCase")]
353pub struct ExportedFunctionCompletedParameters {
354 pub timestamp: Timestamp,
355 #[wit_field(convert_option = WitValue)]
356 pub response: Option<ValueAndType>,
357 pub consumed_fuel: i64,
358}
359
360#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
361#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
362#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
363#[serde(rename_all = "camelCase")]
364pub struct TimestampParameter {
365 pub timestamp: Timestamp,
366}
367
368#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
369#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
370#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
371#[serde(rename_all = "camelCase")]
372pub struct ErrorParameters {
373 pub timestamp: Timestamp,
374 pub error: String,
375}
376
377#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
378#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
379#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
380#[serde(rename_all = "camelCase")]
381pub struct JumpParameters {
382 pub timestamp: Timestamp,
383 pub jump: OplogRegion,
384}
385
386impl IntoValue for JumpParameters {
387 fn into_value(self) -> Value {
388 Value::Record(vec![
389 self.timestamp.into_value(),
390 self.jump.start.into_value(),
391 self.jump.end.into_value(),
392 ])
393 }
394
395 fn get_type() -> AnalysedType {
396 record(vec![
397 field("timestamp", Timestamp::get_type()),
398 field("start", OplogIndex::get_type()),
399 field("end", OplogIndex::get_type()),
400 ])
401 }
402}
403
404#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
405#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
406#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
407#[serde(rename_all = "camelCase")]
408pub struct ChangeRetryPolicyParameters {
409 pub timestamp: Timestamp,
410 pub new_policy: PublicRetryConfig,
411}
412
413#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
414#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
415#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
416#[serde(rename_all = "camelCase")]
417pub struct EndRegionParameters {
418 pub timestamp: Timestamp,
419 pub begin_index: OplogIndex,
420}
421
422#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
423#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
424pub struct PendingWorkerInvocationParameters {
425 pub timestamp: Timestamp,
426 pub invocation: PublicWorkerInvocation,
427}
428
429#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
430#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
431pub struct PendingUpdateParameters {
432 pub timestamp: Timestamp,
433 pub target_version: ComponentVersion,
434 pub description: PublicUpdateDescription,
435}
436
437#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
438#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
439pub struct SuccessfulUpdateParameters {
440 pub timestamp: Timestamp,
441 pub target_version: ComponentVersion,
442 pub new_component_size: u64,
443 pub new_active_plugins: BTreeSet<PluginInstallationDescription>,
444}
445
446#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
447#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
448#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
449#[serde(rename_all = "camelCase")]
450pub struct FailedUpdateParameters {
451 pub timestamp: Timestamp,
452 pub target_version: ComponentVersion,
453 pub details: Option<String>,
454}
455
456#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
457#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
458#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
459#[serde(rename_all = "camelCase")]
460pub struct GrowMemoryParameters {
461 pub timestamp: Timestamp,
462 pub delta: u64,
463}
464
465#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
466#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
467#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
468#[serde(rename_all = "camelCase")]
469pub struct ResourceParameters {
470 pub timestamp: Timestamp,
471 pub id: WorkerResourceId,
472}
473
474#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
475#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
476#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
477#[serde(rename_all = "camelCase")]
478pub struct DescribeResourceParameters {
479 pub timestamp: Timestamp,
480 pub id: WorkerResourceId,
481 pub resource_name: String,
482 #[wit_field(convert_vec = WitValue)]
483 pub resource_params: Vec<ValueAndType>,
484}
485
486#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
487#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
488#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
489#[serde(rename_all = "camelCase")]
490pub struct LogParameters {
491 pub timestamp: Timestamp,
492 pub level: LogLevel,
493 pub context: String,
494 pub message: String,
495}
496
497#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
498#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
499#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
500#[serde(rename_all = "camelCase")]
501pub struct ActivatePluginParameters {
502 pub timestamp: Timestamp,
503 pub plugin: PluginInstallationDescription,
504}
505
506#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
507#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
508#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
509#[serde(rename_all = "camelCase")]
510pub struct DeactivatePluginParameters {
511 pub timestamp: Timestamp,
512 pub plugin: PluginInstallationDescription,
513}
514
515#[derive(Clone, Debug, Serialize, PartialEq, Deserialize)]
516#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
517#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
518#[serde(rename_all = "camelCase")]
519pub struct RevertParameters {
520 pub timestamp: Timestamp,
521 pub dropped_region: OplogRegion,
522}
523
524impl IntoValue for RevertParameters {
525 fn into_value(self) -> Value {
526 Value::Record(vec![
527 self.timestamp.into_value(),
528 self.dropped_region.start.into_value(),
529 self.dropped_region.end.into_value(),
530 ])
531 }
532
533 fn get_type() -> AnalysedType {
534 record(vec![
535 field("timestamp", Timestamp::get_type()),
536 field("start", OplogIndex::get_type()),
537 field("end", OplogIndex::get_type()),
538 ])
539 }
540}
541
542#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
543#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
544#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
545#[serde(rename_all = "camelCase")]
546pub struct CancelInvocationParameters {
547 pub timestamp: Timestamp,
548 pub idempotency_key: IdempotencyKey,
549}
550
551#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
552#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
553#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
554#[serde(rename_all = "camelCase")]
555pub struct StartSpanParameters {
556 pub timestamp: Timestamp,
557 pub span_id: SpanId,
558 pub parent_id: Option<SpanId>,
559 pub linked_context: Option<SpanId>,
560 pub attributes: Vec<PublicAttribute>,
561}
562
563#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
564#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
565#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
566#[serde(rename_all = "camelCase")]
567pub struct FinishSpanParameters {
568 pub timestamp: Timestamp,
569 pub span_id: SpanId,
570}
571
572#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
573#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
574#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
575#[serde(rename_all = "camelCase")]
576pub struct SetSpanAttributeParameters {
577 pub timestamp: Timestamp,
578 pub span_id: SpanId,
579 pub key: String,
580 pub value: PublicAttributeValue,
581}
582
583#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, IntoValue)]
584#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
585#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
586#[serde(rename_all = "camelCase")]
587pub struct ChangePersistenceLevelParameters {
588 pub timestamp: Timestamp,
589 pub persistence_level: PersistenceLevel,
590}
591
592#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, IntoValue)]
601#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
602#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
603#[serde(tag = "type")]
604pub enum PublicOplogEntry {
605 Create(CreateParameters),
606 ImportedFunctionInvoked(ImportedFunctionInvokedParameters),
608 ExportedFunctionInvoked(ExportedFunctionInvokedParameters),
610 ExportedFunctionCompleted(ExportedFunctionCompletedParameters),
612 Suspend(TimestampParameter),
614 Error(ErrorParameters),
616 NoOp(TimestampParameter),
619 Jump(JumpParameters),
624 Interrupted(TimestampParameter),
627 Exited(TimestampParameter),
629 ChangeRetryPolicy(ChangeRetryPolicyParameters),
631 BeginAtomicRegion(TimestampParameter),
634 EndAtomicRegion(EndRegionParameters),
638 BeginRemoteWrite(TimestampParameter),
642 EndRemoteWrite(EndRegionParameters),
644 PendingWorkerInvocation(PendingWorkerInvocationParameters),
646 PendingUpdate(PendingUpdateParameters),
648 SuccessfulUpdate(SuccessfulUpdateParameters),
650 FailedUpdate(FailedUpdateParameters),
652 GrowMemory(GrowMemoryParameters),
654 CreateResource(ResourceParameters),
656 DropResource(ResourceParameters),
658 DescribeResource(DescribeResourceParameters),
660 Log(LogParameters),
662 Restart(TimestampParameter),
664 ActivatePlugin(ActivatePluginParameters),
666 DeactivatePlugin(DeactivatePluginParameters),
668 Revert(RevertParameters),
670 CancelInvocation(CancelInvocationParameters),
672 StartSpan(StartSpanParameters),
674 FinishSpan(FinishSpanParameters),
676 SetSpanAttribute(SetSpanAttributeParameters),
678 ChangePersistenceLevel(ChangePersistenceLevelParameters),
680}
681
682impl PublicOplogEntry {
683 pub fn matches(&self, query: &Query) -> bool {
684 fn matches_impl(entry: &PublicOplogEntry, query: &Query, field_stack: &[String]) -> bool {
685 match query {
686 Query::Or { queries } => queries
687 .iter()
688 .any(|query| matches_impl(entry, query, field_stack)),
689 Query::And { queries } => queries
690 .iter()
691 .all(|query| matches_impl(entry, query, field_stack)),
692 Query::Not { query } => !matches_impl(entry, query, field_stack),
693 Query::Regex { .. } => {
694 entry.matches_leaf_query(field_stack, &query.clone().try_into().unwrap())
695 }
696 Query::Term { .. } => {
697 entry.matches_leaf_query(field_stack, &query.clone().try_into().unwrap())
698 }
699 Query::Phrase { .. } => {
700 entry.matches_leaf_query(field_stack, &query.clone().try_into().unwrap())
701 }
702 Query::Field { field, query } => {
703 let mut new_stack: Vec<String> = field_stack.to_vec();
704 let parts: Vec<String> = field.split(".").map(|s| s.to_string()).collect();
705 new_stack.extend(parts);
706 matches_impl(entry, query, &new_stack)
707 }
708 }
709 }
710
711 matches_impl(self, query, &[])
712 }
713
714 fn string_match(s: &str, path: &[String], query_path: &[String], query: &LeafQuery) -> bool {
715 let lowercase_path = path
716 .iter()
717 .map(|s| s.to_lowercase())
718 .collect::<Vec<String>>();
719 let lowercase_query_path = query_path
720 .iter()
721 .map(|s| s.to_lowercase())
722 .collect::<Vec<String>>();
723 if lowercase_path == lowercase_query_path || query_path.is_empty() {
724 query.matches(s)
725 } else {
726 false
727 }
728 }
729
730 fn span_attribute_match(
731 attributes: &Vec<PublicAttribute>,
732 path_stack: &[String],
733 query_path: &[String],
734 query: &LeafQuery,
735 ) -> bool {
736 for attr in attributes {
737 let key = &attr.key;
738 let value = &attr.value;
739 let mut new_path: Vec<String> = path_stack.to_vec();
740 new_path.push(key.clone());
741
742 let vnt = match value {
743 PublicAttributeValue::String(StringAttributeValue { value }) => {
744 value.clone().into_value_and_type()
745 }
746 };
747
748 if Self::match_value(&vnt, &new_path, query_path, query) {
749 return true;
750 }
751 }
752 false
753 }
754
755 fn matches_leaf_query(&self, query_path: &[String], query: &LeafQuery) -> bool {
756 match self {
757 PublicOplogEntry::Create(_params) => {
758 Self::string_match("create", &[], query_path, query)
759 }
760 PublicOplogEntry::ImportedFunctionInvoked(params) => {
761 Self::string_match("importedfunctioninvoked", &[], query_path, query)
762 || Self::string_match("imported-function-invoked", &[], query_path, query)
763 || Self::string_match("imported-function", &[], query_path, query)
764 || Self::string_match(¶ms.function_name, &[], query_path, query)
765 || Self::match_value(¶ms.request, &[], query_path, query)
766 || Self::match_value(¶ms.response, &[], query_path, query)
767 }
768 PublicOplogEntry::ExportedFunctionInvoked(params) => {
769 Self::string_match("exportedfunctioninvoked", &[], query_path, query)
770 || Self::string_match("exported-function-invoked", &[], query_path, query)
771 || Self::string_match("exported-function", &[], query_path, query)
772 || Self::string_match(¶ms.function_name, &[], query_path, query)
773 || params
774 .request
775 .iter()
776 .any(|v| Self::match_value(v, &[], query_path, query))
777 || Self::string_match(¶ms.idempotency_key.value, &[], query_path, query)
778 }
779 PublicOplogEntry::ExportedFunctionCompleted(params) => {
780 Self::string_match("exportedfunctioncompleted", &[], query_path, query)
781 || Self::string_match("exported-function-completed", &[], query_path, query)
782 || Self::string_match("exported-function", &[], query_path, query)
783 || match ¶ms.response {
784 Some(response) => Self::match_value(response, &[], query_path, query),
785 None => false,
786 }
787 }
789 PublicOplogEntry::Suspend(_params) => {
790 Self::string_match("suspend", &[], query_path, query)
791 }
792 PublicOplogEntry::Error(params) => {
793 Self::string_match("error", &[], query_path, query)
794 || Self::string_match(¶ms.error, &[], query_path, query)
795 }
796 PublicOplogEntry::NoOp(_params) => Self::string_match("noop", &[], query_path, query),
797 PublicOplogEntry::Jump(_params) => Self::string_match("jump", &[], query_path, query),
798 PublicOplogEntry::Interrupted(_params) => {
799 Self::string_match("interrupted", &[], query_path, query)
800 }
801 PublicOplogEntry::Exited(_params) => {
802 Self::string_match("exited", &[], query_path, query)
803 }
804 PublicOplogEntry::ChangeRetryPolicy(_params) => {
805 Self::string_match("changeretrypolicy", &[], query_path, query)
806 || Self::string_match("change-retry-policy", &[], query_path, query)
807 }
808 PublicOplogEntry::BeginAtomicRegion(_params) => {
809 Self::string_match("beginatomicregion", &[], query_path, query)
810 || Self::string_match("begin-atomic-region", &[], query_path, query)
811 }
812 PublicOplogEntry::EndAtomicRegion(_params) => {
813 Self::string_match("endatomicregion", &[], query_path, query)
814 || Self::string_match("end-atomic-region", &[], query_path, query)
815 }
816 PublicOplogEntry::BeginRemoteWrite(_params) => {
817 Self::string_match("beginremotewrite", &[], query_path, query)
818 || Self::string_match("begin-remote-write", &[], query_path, query)
819 }
820 PublicOplogEntry::EndRemoteWrite(_params) => {
821 Self::string_match("endremotewrite", &[], query_path, query)
822 || Self::string_match("end-remote-write", &[], query_path, query)
823 }
824 PublicOplogEntry::PendingWorkerInvocation(params) => {
825 Self::string_match("pendingworkerinvocation", &[], query_path, query)
826 || Self::string_match("pending-worker-invocation", &[], query_path, query)
827 || match ¶ms.invocation {
828 PublicWorkerInvocation::ExportedFunction(params) => {
829 Self::string_match(¶ms.full_function_name, &[], query_path, query)
830 || Self::string_match(
831 ¶ms.idempotency_key.value,
832 &[],
833 query_path,
834 query,
835 )
836 || params
837 .function_input
838 .as_ref()
839 .map(|params| {
840 params
841 .iter()
842 .any(|v| Self::match_value(v, &[], query_path, query))
843 })
844 .unwrap_or(false)
845 }
846 PublicWorkerInvocation::ManualUpdate(params) => Self::string_match(
847 ¶ms.target_version.to_string(),
848 &[],
849 query_path,
850 query,
851 ),
852 }
853 }
854 PublicOplogEntry::PendingUpdate(params) => {
855 Self::string_match("pendingupdate", &[], query_path, query)
856 || Self::string_match("pending-update", &[], query_path, query)
857 || Self::string_match("update", &[], query_path, query)
858 || Self::string_match(
859 ¶ms.target_version.to_string(),
860 &[],
861 query_path,
862 query,
863 )
864 }
865 PublicOplogEntry::SuccessfulUpdate(params) => {
866 Self::string_match("successfulupdate", &[], query_path, query)
867 || Self::string_match("successful-update", &[], query_path, query)
868 || Self::string_match("update", &[], query_path, query)
869 || Self::string_match(
870 ¶ms.target_version.to_string(),
871 &[],
872 query_path,
873 query,
874 )
875 }
876 PublicOplogEntry::FailedUpdate(params) => {
877 Self::string_match("failedupdate", &[], query_path, query)
878 || Self::string_match("failed-update", &[], query_path, query)
879 || Self::string_match("update", &[], query_path, query)
880 || Self::string_match(
881 ¶ms.target_version.to_string(),
882 &[],
883 query_path,
884 query,
885 )
886 || params
887 .details
888 .as_ref()
889 .map(|details| Self::string_match(details, &[], query_path, query))
890 .unwrap_or(false)
891 }
892 PublicOplogEntry::GrowMemory(_params) => {
893 Self::string_match("growmemory", &[], query_path, query)
894 || Self::string_match("grow-memory", &[], query_path, query)
895 }
896 PublicOplogEntry::CreateResource(_params) => {
897 Self::string_match("createresource", &[], query_path, query)
898 || Self::string_match("create-resource", &[], query_path, query)
899 }
900 PublicOplogEntry::DropResource(_params) => {
901 Self::string_match("dropresource", &[], query_path, query)
902 || Self::string_match("drop-resource", &[], query_path, query)
903 }
904 PublicOplogEntry::DescribeResource(params) => {
905 Self::string_match("describeresource", &[], query_path, query)
906 || Self::string_match("describe-resource", &[], query_path, query)
907 || Self::string_match(¶ms.resource_name, &[], query_path, query)
908 || params
909 .resource_params
910 .iter()
911 .any(|v| Self::match_value(v, &[], query_path, query))
912 }
913 PublicOplogEntry::Log(params) => {
914 Self::string_match("log", &[], query_path, query)
915 || Self::string_match(¶ms.context, &[], query_path, query)
916 || Self::string_match(¶ms.message, &[], query_path, query)
917 }
918 PublicOplogEntry::Restart(_params) => {
919 Self::string_match("restart", &[], query_path, query)
920 }
921 PublicOplogEntry::ActivatePlugin(_params) => {
922 Self::string_match("activateplugin", &[], query_path, query)
923 || Self::string_match("activate-plugin", &[], query_path, query)
924 }
925 PublicOplogEntry::DeactivatePlugin(_params) => {
926 Self::string_match("deactivateplugin", &[], query_path, query)
927 || Self::string_match("deactivate-plugin", &[], query_path, query)
928 }
929 PublicOplogEntry::Revert(_params) => {
930 Self::string_match("revert", &[], query_path, query)
931 }
932 PublicOplogEntry::CancelInvocation(params) => {
933 Self::string_match("cancel", &[], query_path, query)
934 || Self::string_match("cancel-invocation", &[], query_path, query)
935 || Self::string_match(¶ms.idempotency_key.value, &[], query_path, query)
936 }
937 PublicOplogEntry::StartSpan(params) => {
938 Self::string_match("startspan", &[], query_path, query)
939 || Self::string_match("start-span", &[], query_path, query)
940 || Self::string_match(¶ms.span_id.to_string(), &[], query_path, query)
941 || Self::string_match(
942 ¶ms
943 .parent_id
944 .as_ref()
945 .map(|id| id.to_string())
946 .unwrap_or_default(),
947 &[],
948 query_path,
949 query,
950 )
951 || Self::string_match(
952 ¶ms
953 .linked_context
954 .as_ref()
955 .map(|id| id.to_string())
956 .unwrap_or_default(),
957 &[],
958 query_path,
959 query,
960 )
961 || Self::span_attribute_match(¶ms.attributes, &[], query_path, query)
962 }
963 PublicOplogEntry::FinishSpan(params) => {
964 Self::string_match("finishspan", &[], query_path, query)
965 || Self::string_match("finish-span", &[], query_path, query)
966 || Self::string_match(¶ms.span_id.to_string(), &[], query_path, query)
967 }
968 PublicOplogEntry::SetSpanAttribute(params) => {
969 let attributes = vec![PublicAttribute {
970 key: params.key.clone(),
971 value: params.value.clone(),
972 }];
973 Self::string_match("setspanattribute", &[], query_path, query)
974 || Self::string_match("set-span-attribute", &[], query_path, query)
975 || Self::string_match(¶ms.key, &[], query_path, query)
976 || Self::span_attribute_match(&attributes, &[], query_path, query)
977 }
978 PublicOplogEntry::ChangePersistenceLevel(_params) => {
979 Self::string_match("changepersistencelevel", &[], query_path, query)
980 || Self::string_match("change-persistence-level", &[], query_path, query)
981 || Self::string_match("persistence-level", &[], query_path, query)
982 }
983 }
984 }
985
986 fn match_value(
987 value: &ValueAndType,
988 path_stack: &[String],
989 query_path: &[String],
990 query: &LeafQuery,
991 ) -> bool {
992 match (&value.value, &value.typ) {
993 (Value::Bool(value), _) => {
994 Self::string_match(&value.to_string(), path_stack, query_path, query)
995 }
996 (Value::U8(value), _) => {
997 Self::string_match(&value.to_string(), path_stack, query_path, query)
998 }
999 (Value::U16(value), _) => {
1000 Self::string_match(&value.to_string(), path_stack, query_path, query)
1001 }
1002 (Value::U32(value), _) => {
1003 Self::string_match(&value.to_string(), path_stack, query_path, query)
1004 }
1005 (Value::U64(value), _) => {
1006 Self::string_match(&value.to_string(), path_stack, query_path, query)
1007 }
1008 (Value::S8(value), _) => {
1009 Self::string_match(&value.to_string(), path_stack, query_path, query)
1010 }
1011 (Value::S16(value), _) => {
1012 Self::string_match(&value.to_string(), path_stack, query_path, query)
1013 }
1014 (Value::S32(value), _) => {
1015 Self::string_match(&value.to_string(), path_stack, query_path, query)
1016 }
1017 (Value::S64(value), _) => {
1018 Self::string_match(&value.to_string(), path_stack, query_path, query)
1019 }
1020 (Value::F32(value), _) => {
1021 Self::string_match(&value.to_string(), path_stack, query_path, query)
1022 }
1023 (Value::F64(value), _) => {
1024 Self::string_match(&value.to_string(), path_stack, query_path, query)
1025 }
1026 (Value::Char(value), _) => {
1027 Self::string_match(&value.to_string(), path_stack, query_path, query)
1028 }
1029 (Value::String(value), _) => {
1030 Self::string_match(&value.to_string(), path_stack, query_path, query)
1031 }
1032 (Value::List(elems), AnalysedType::List(list)) => elems.iter().any(|v| {
1033 Self::match_value(
1034 &ValueAndType::new(v.clone(), (*list.inner).clone()),
1035 path_stack,
1036 query_path,
1037 query,
1038 )
1039 }),
1040 (Value::Tuple(elems), AnalysedType::Tuple(tuple)) => {
1041 if elems.len() != tuple.items.len() {
1042 false
1043 } else {
1044 elems
1045 .iter()
1046 .zip(tuple.items.iter())
1047 .enumerate()
1048 .any(|(idx, (v, t))| {
1049 let mut new_path: Vec<String> = path_stack.to_vec();
1050 new_path.push(idx.to_string());
1051 Self::match_value(
1052 &ValueAndType::new(v.clone(), t.clone()),
1053 &new_path,
1054 query_path,
1055 query,
1056 )
1057 })
1058 }
1059 }
1060 (Value::Record(fields), AnalysedType::Record(record)) => {
1061 if fields.len() != record.fields.len() {
1062 false
1063 } else {
1064 fields.iter().zip(record.fields.iter()).any(|(v, t)| {
1065 let mut new_path: Vec<String> = path_stack.to_vec();
1066 new_path.push(t.name.clone());
1067 Self::match_value(
1068 &ValueAndType::new(v.clone(), t.typ.clone()),
1069 &new_path,
1070 path_stack,
1071 query,
1072 )
1073 })
1074 }
1075 }
1076 (
1077 Value::Variant {
1078 case_value,
1079 case_idx,
1080 },
1081 AnalysedType::Variant(variant),
1082 ) => {
1083 let case = variant.cases.get(*case_idx as usize);
1084 match (case_value, case) {
1085 (
1086 Some(value),
1087 Some(NameOptionTypePair {
1088 typ: Some(typ),
1089 name,
1090 }),
1091 ) => {
1092 let mut new_path: Vec<String> = path_stack.to_vec();
1093 new_path.push(name.clone());
1094 Self::match_value(
1095 &ValueAndType::new((**value).clone(), typ.clone()),
1096 &new_path,
1097 query_path,
1098 query,
1099 )
1100 }
1101 _ => false,
1102 }
1103 }
1104 (Value::Enum(value), AnalysedType::Enum(typ)) => {
1105 if let Some(case) = typ.cases.get(*value as usize) {
1106 Self::string_match(case, path_stack, query_path, query)
1107 } else {
1108 false
1109 }
1110 }
1111 (Value::Flags(bitmap), AnalysedType::Flags(flags)) => {
1112 let names = bitmap
1113 .iter()
1114 .enumerate()
1115 .filter_map(|(idx, set)| if *set { flags.names.get(idx) } else { None })
1116 .collect::<Vec<_>>();
1117 names
1118 .iter()
1119 .any(|name| Self::string_match(name, path_stack, query_path, query))
1120 }
1121 (Value::Option(Some(value)), AnalysedType::Option(typ)) => Self::match_value(
1122 &ValueAndType::new((**value).clone(), (*typ.inner).clone()),
1123 path_stack,
1124 query_path,
1125 query,
1126 ),
1127 (Value::Result(value), AnalysedType::Result(typ)) => match value {
1128 Ok(Some(value)) if typ.ok.is_some() => {
1129 let mut new_path = path_stack.to_vec();
1130 new_path.push("ok".to_string());
1131 Self::match_value(
1132 &ValueAndType::new(
1133 (**value).clone(),
1134 (**(typ.ok.as_ref().unwrap())).clone(),
1135 ),
1136 &new_path,
1137 query_path,
1138 query,
1139 )
1140 }
1141 Err(Some(value)) if typ.err.is_some() => {
1142 let mut new_path = path_stack.to_vec();
1143 new_path.push("err".to_string());
1144 Self::match_value(
1145 &ValueAndType::new(
1146 (**value).clone(),
1147 (**(typ.err.as_ref().unwrap())).clone(),
1148 ),
1149 &new_path,
1150 query_path,
1151 query,
1152 )
1153 }
1154 _ => false,
1155 },
1156 (Value::Handle { .. }, _) => false,
1157 _ => false,
1158 }
1159 }
1160}
1161
1162#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1163#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
1164#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
1165#[serde(rename_all = "camelCase")]
1166pub struct OplogCursor {
1167 pub next_oplog_index: u64,
1168 pub current_component_version: u64,
1169}
1170
1171#[cfg(feature = "poem")]
1172impl poem_openapi::types::ParseFromParameter for OplogCursor {
1173 fn parse_from_parameter(value: &str) -> poem_openapi::types::ParseResult<Self> {
1174 let parts: Vec<&str> = value.split('-').collect();
1175 if parts.len() != 2 {
1176 return Err("Invalid oplog cursor".into());
1177 }
1178 let next_oplog_index = parts[0]
1179 .parse()
1180 .map_err(|_| "Invalid index in the oplog cursor")?;
1181 let current_component_version = parts[1]
1182 .parse()
1183 .map_err(|_| "Invalid component version in the oplog cursor")?;
1184 Ok(OplogCursor {
1185 next_oplog_index,
1186 current_component_version,
1187 })
1188 }
1189}
1190
1191impl Display for OplogCursor {
1192 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1193 write!(
1194 f,
1195 "{}-{}",
1196 self.next_oplog_index, self.current_component_version
1197 )
1198 }
1199}