obeli_sk_concepts/
storage.rs

1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ComponentRetryConfig;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FunctionFqn;
7use crate::JoinSetId;
8use crate::Params;
9use crate::StrVariant;
10use crate::SupportedFunctionReturnValue;
11use crate::prefixed_ulid::DelayId;
12use crate::prefixed_ulid::ExecutionIdDerived;
13use crate::prefixed_ulid::ExecutorId;
14use crate::prefixed_ulid::RunId;
15use assert_matches::assert_matches;
16use async_trait::async_trait;
17use chrono::TimeDelta;
18use chrono::{DateTime, Utc};
19use http_client_trace::HttpClientTrace;
20use serde::Deserialize;
21use serde::Serialize;
22use std::fmt::Debug;
23use std::fmt::Display;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::time::Duration;
28use strum::IntoStaticStr;
29use tracing::error;
30
31/// Remote client representation of the execution journal.
32#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
33pub struct ExecutionLog {
34    pub execution_id: ExecutionId,
35    pub events: Vec<ExecutionEvent>,
36    pub responses: Vec<JoinSetResponseEventOuter>,
37    pub next_version: Version, // Is not advanced once in Finished state
38    pub pending_state: PendingState,
39}
40
41impl ExecutionLog {
42    #[must_use]
43    pub fn can_be_retried_after(
44        temporary_event_count: u32,
45        max_retries: u32,
46        retry_exp_backoff: Duration,
47    ) -> Option<Duration> {
48        // If max_retries == u32::MAX, wrapping is OK after this succeeds - we want to retry forever.
49        if temporary_event_count <= max_retries {
50            // TODO: Add test for number of retries
51            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
52            Some(duration)
53        } else {
54            None
55        }
56    }
57
58    #[must_use]
59    pub fn compute_retry_duration_when_retrying_forever(
60        temporary_event_count: u32,
61        retry_exp_backoff: Duration,
62    ) -> Duration {
63        Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
64            .expect("`max_retries` set to MAX must never return None")
65    }
66
67    #[must_use]
68    pub fn ffqn(&self) -> &FunctionFqn {
69        assert_matches!(self.events.first(), Some(ExecutionEvent {
70            event: ExecutionEventInner::Created { ffqn, .. },
71            ..
72        }) => ffqn)
73    }
74
75    #[must_use]
76    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
77        assert_matches!(self.events.first(), Some(ExecutionEvent {
78            event: ExecutionEventInner::Created { parent, .. },
79            ..
80        }) => parent.clone())
81    }
82
83    #[must_use]
84    pub fn last_event(&self) -> &ExecutionEvent {
85        self.events.last().expect("must contain at least one event")
86    }
87
88    #[must_use]
89    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
90        if let ExecutionEvent {
91            event: ExecutionEventInner::Finished { result, .. },
92            ..
93        } = self.events.pop().expect("must contain at least one event")
94        {
95            Some(result)
96        } else {
97            None
98        }
99    }
100
101    pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
102        self.events.iter().filter_map(|event| {
103            if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
104                Some(eh.clone())
105            } else {
106                None
107            }
108        })
109    }
110
111    #[cfg(feature = "test")]
112    #[must_use]
113    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
114        self.events
115            .iter()
116            .find_map(move |event| match &event.event {
117                ExecutionEventInner::HistoryEvent {
118                    event:
119                        HistoryEvent::JoinSetRequest {
120                            join_set_id: found,
121                            request,
122                        },
123                    ..
124                } if *join_set_id == *found => Some(request),
125                _ => None,
126            })
127    }
128}
129
130pub type VersionType = u32;
131#[derive(
132    Debug,
133    Default,
134    Clone,
135    PartialEq,
136    Eq,
137    Hash,
138    derive_more::Display,
139    derive_more::Into,
140    serde::Serialize,
141    serde::Deserialize,
142)]
143#[serde(transparent)]
144pub struct Version(pub VersionType);
145impl Version {
146    #[must_use]
147    pub fn new(arg: VersionType) -> Version {
148        Version(arg)
149    }
150
151    #[must_use]
152    pub fn increment(&self) -> Version {
153        Version(self.0 + 1)
154    }
155}
156
157#[derive(
158    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
159)]
160#[display("{event}")]
161pub struct ExecutionEvent {
162    pub created_at: DateTime<Utc>,
163    pub event: ExecutionEventInner,
164    pub backtrace_id: Option<Version>,
165}
166
167/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
168#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
169pub struct JoinSetResponseEventOuter {
170    pub created_at: DateTime<Utc>,
171    pub event: JoinSetResponseEvent,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
175pub struct JoinSetResponseEvent {
176    pub join_set_id: JoinSetId,
177    pub event: JoinSetResponse,
178}
179
180#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
181#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
182#[serde(tag = "type")]
183pub enum JoinSetResponse {
184    DelayFinished {
185        delay_id: DelayId,
186    },
187    ChildExecutionFinished {
188        child_execution_id: ExecutionIdDerived,
189        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
190        finished_version: Version,
191        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
192        result: SupportedFunctionReturnValue,
193    },
194}
195
196pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
197    ffqn: FunctionFqn::new_static("", ""),
198    params: Params::empty(),
199    parent: None,
200    scheduled_at: DateTime::from_timestamp_nanos(0),
201    component_id: ComponentId::dummy_activity(),
202    metadata: ExecutionMetadata::empty(),
203    scheduled_by: None,
204};
205pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
206    event: HistoryEvent::JoinSetCreate {
207        join_set_id: JoinSetId {
208            kind: crate::JoinSetKind::OneOff,
209            name: StrVariant::empty(),
210        },
211        closing_strategy: ClosingStrategy::Complete,
212    },
213};
214
215#[derive(
216    Clone,
217    derive_more::Debug,
218    derive_more::Display,
219    PartialEq,
220    Eq,
221    Serialize,
222    Deserialize,
223    IntoStaticStr,
224)]
225#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
226#[allow(clippy::large_enum_variant)]
227pub enum ExecutionEventInner {
228    #[display("Created({ffqn}, `{scheduled_at}`)")]
229    Created {
230        ffqn: FunctionFqn,
231        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
232        #[debug(skip)]
233        params: Params,
234        parent: Option<(ExecutionId, JoinSetId)>,
235        scheduled_at: DateTime<Utc>,
236        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
237        component_id: ComponentId,
238        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
239        metadata: ExecutionMetadata,
240        scheduled_by: Option<ExecutionId>,
241    },
242    #[display("Locked(`{lock_expires_at}`, {component_id})")]
243    Locked {
244        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
245        component_id: ComponentId,
246        executor_id: ExecutorId,
247        run_id: RunId,
248        lock_expires_at: DateTime<Utc>,
249        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
250        retry_config: ComponentRetryConfig,
251    },
252    /// Returns execution to [`PendingState::PendingNow`] state
253    /// without timing out. This can happen when the executor is running
254    /// out of resources like [`WorkerError::LimitReached`] or when
255    /// the executor is shutting down.
256    #[display("Unlocked(`{backoff_expires_at}`)")]
257    Unlocked {
258        backoff_expires_at: DateTime<Utc>,
259        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
260        reason: StrVariant,
261    },
262    // Created by the executor holding the lock.
263    // After expiry interpreted as pending.
264    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
265    TemporarilyFailed {
266        backoff_expires_at: DateTime<Utc>,
267        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
268        reason_full: StrVariant,
269        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
270        reason_inner: StrVariant,
271        detail: Option<String>,
272        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
273        http_client_traces: Option<Vec<HttpClientTrace>>,
274    },
275    // Created by the executor holding the lock.
276    // After expiry interpreted as pending.
277    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
278    TemporarilyTimedOut {
279        backoff_expires_at: DateTime<Utc>,
280        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
281        http_client_traces: Option<Vec<HttpClientTrace>>,
282    },
283    // Created by the executor holding the lock.
284    #[display("Finished")]
285    Finished {
286        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
287        result: SupportedFunctionReturnValue,
288        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
289        http_client_traces: Option<Vec<HttpClientTrace>>,
290    },
291
292    #[display("HistoryEvent({event})")]
293    HistoryEvent { event: HistoryEvent },
294}
295
296impl ExecutionEventInner {
297    #[must_use]
298    pub fn is_temporary_event(&self) -> bool {
299        matches!(
300            self,
301            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
302        )
303    }
304
305    #[must_use]
306    pub fn variant(&self) -> &'static str {
307        Into::<&'static str>::into(self)
308    }
309
310    #[must_use]
311    pub fn join_set_id(&self) -> Option<&JoinSetId> {
312        match self {
313            Self::Created {
314                parent: Some((_parent_id, join_set_id)),
315                ..
316            } => Some(join_set_id),
317            Self::HistoryEvent {
318                event:
319                    HistoryEvent::JoinSetCreate { join_set_id, .. }
320                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
321                    | HistoryEvent::JoinNext { join_set_id, .. },
322            } => Some(join_set_id),
323            _ => None,
324        }
325    }
326}
327
328#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
329#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
330#[serde(tag = "type")]
331pub enum PersistKind {
332    #[display("RandomU64({min}, {max_inclusive})")]
333    RandomU64 { min: u64, max_inclusive: u64 },
334    #[display("RandomString({min_length}, {max_length_exclusive})")]
335    RandomString {
336        min_length: u64,
337        max_length_exclusive: u64,
338    },
339}
340
341#[must_use]
342pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
343    value.to_be_bytes()
344}
345
346#[must_use]
347pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
348    u64::from_be_bytes(bytes)
349}
350
351#[derive(
352    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
353)]
354#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
355#[serde(tag = "type")]
356/// Must be created by the executor in [`PendingState::Locked`].
357pub enum HistoryEvent {
358    /// Persist a generated pseudorandom value.
359    #[display("Persist")]
360    Persist {
361        #[debug(skip)]
362        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
363        kind: PersistKind,
364    },
365    #[display("JoinSetCreate({join_set_id})")]
366    JoinSetCreate {
367        join_set_id: JoinSetId,
368        closing_strategy: ClosingStrategy,
369    },
370    #[display("JoinSetRequest({request})")]
371    // join_set_id is part of ExecutionId or DelayId in the `request`
372    JoinSetRequest {
373        join_set_id: JoinSetId,
374        request: JoinSetRequest,
375    },
376    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
377    /// When the response arrives at `resp_time`:
378    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
379    /// original executor can continue. After the expiry any executor can continue without
380    /// marking the execution as timed out.
381    #[display("JoinNext({join_set_id})")]
382    JoinNext {
383        join_set_id: JoinSetId,
384        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
385        /// The pending status will be kept in Locked state until `run_expires_at`.
386        run_expires_at: DateTime<Utc>,
387        /// Set to a specific function when calling `-await-next` extension function, used for
388        /// determinism checks.
389        requested_ffqn: Option<FunctionFqn>,
390        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
391        closing: bool,
392    },
393    /// Records the fact that a join set was awaited more times than its submission count.
394    #[display("JoinNextTooMany({join_set_id})")]
395    JoinNextTooMany {
396        join_set_id: JoinSetId,
397        /// Set to a specific function when calling `-await-next` extension function, used for
398        /// determinism checks.
399        requested_ffqn: Option<FunctionFqn>,
400    },
401    #[display("Schedule({execution_id}, {schedule_at})")]
402    Schedule {
403        execution_id: ExecutionId,
404        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
405    },
406    #[display("Stub({target_execution_id})")]
407    Stub {
408        target_execution_id: ExecutionIdDerived,
409        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
410        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
411        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
412    },
413}
414
415#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
416#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
417pub enum HistoryEventScheduleAt {
418    Now,
419    #[display("At(`{_0}`)")]
420    At(DateTime<Utc>),
421    #[display("In({_0:?})")]
422    In(Duration),
423}
424
425#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
426pub enum ScheduleAtConversionError {
427    #[error("source duration value is out of range")]
428    OutOfRangeError,
429}
430
431impl HistoryEventScheduleAt {
432    pub fn as_date_time(
433        &self,
434        now: DateTime<Utc>,
435    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
436        match self {
437            Self::Now => Ok(now),
438            Self::At(date_time) => Ok(*date_time),
439            Self::In(duration) => {
440                let time_delta = TimeDelta::from_std(*duration)
441                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
442                now.checked_add_signed(time_delta)
443                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
444            }
445        }
446    }
447}
448
449#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
450#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
451#[serde(tag = "type")]
452pub enum JoinSetRequest {
453    // Must be created by the executor in `PendingState::Locked`.
454    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
455    DelayRequest {
456        delay_id: DelayId,
457        expires_at: DateTime<Utc>,
458        schedule_at: HistoryEventScheduleAt,
459    },
460    // Must be created by the executor in `PendingState::Locked`.
461    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
462    ChildExecutionRequest {
463        child_execution_id: ExecutionIdDerived,
464        target_ffqn: FunctionFqn,
465        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
466        params: Params,
467    },
468}
469
470/// Error that is not specific to an execution.
471#[derive(Debug, Clone, thiserror::Error, PartialEq)]
472pub enum DbErrorGeneric {
473    #[error("database error: {0}")]
474    Uncategorized(StrVariant), // Previously GenericError
475    #[error("database was closed")]
476    Close,
477}
478
479#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
480pub enum DbErrorWritePermanent {
481    /// Parameter error.
482    #[error("validation failed: {0}")]
483    ValidationFailed(StrVariant),
484    #[error("cannot write: {reason}")]
485    CannotWrite {
486        reason: StrVariant,
487        expected_version: Option<Version>,
488    },
489}
490
491/// Write error tied to an execution
492#[derive(Debug, Clone, thiserror::Error, PartialEq)]
493pub enum DbErrorWrite {
494    #[error("cannot write - row not found")]
495    NotFound,
496    /// Retrying will not resolve the error, the execution is in an incorrect state.
497    #[error("permanent error: {0}")]
498    Permanent(#[from] DbErrorWritePermanent),
499    #[error(transparent)]
500    DbErrorGeneric(#[from] DbErrorGeneric),
501}
502
503/// Read error tied to an execution
504#[derive(Debug, Clone, thiserror::Error, PartialEq)]
505pub enum DbErrorRead {
506    #[error("cannot read - row not found")]
507    NotFound,
508    #[error(transparent)]
509    DbErrorGeneric(#[from] DbErrorGeneric),
510}
511
512impl From<DbErrorRead> for DbErrorWrite {
513    fn from(value: DbErrorRead) -> DbErrorWrite {
514        match value {
515            DbErrorRead::NotFound => DbErrorWrite::NotFound,
516            DbErrorRead::DbErrorGeneric(err) => DbErrorWrite::DbErrorGeneric(err),
517        }
518    }
519}
520
521#[derive(Debug, thiserror::Error, PartialEq)]
522pub enum DbErrorReadWithTimeout {
523    #[error("timeout")]
524    Timeout,
525    #[error(transparent)]
526    DbErrorRead(#[from] DbErrorRead),
527}
528impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
529    fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
530        Self::from(DbErrorRead::from(value))
531    }
532}
533
534// Represents next version after successfuly appended to execution log.
535// TODO: Convert to struct with next_version
536pub type AppendResponse = Version;
537pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
538
539#[derive(Debug, Clone)]
540pub struct LockedExecution {
541    pub execution_id: ExecutionId,
542    pub next_version: Version,
543    pub metadata: ExecutionMetadata,
544    pub run_id: RunId,
545    pub ffqn: FunctionFqn,
546    pub params: Params,
547    pub event_history: Vec<HistoryEvent>, // FIXME: Add version
548    pub responses: Vec<JoinSetResponseEventOuter>,
549    pub parent: Option<(ExecutionId, JoinSetId)>,
550    pub intermittent_event_count: u32,
551}
552
553pub type LockPendingResponse = Vec<LockedExecution>;
554pub type AppendBatchResponse = Version;
555
556#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
557#[display("{event}")]
558pub struct AppendRequest {
559    pub created_at: DateTime<Utc>,
560    pub event: ExecutionEventInner,
561}
562
563#[derive(Debug, Clone)]
564pub struct CreateRequest {
565    pub created_at: DateTime<Utc>,
566    pub execution_id: ExecutionId,
567    pub ffqn: FunctionFqn,
568    pub params: Params,
569    pub parent: Option<(ExecutionId, JoinSetId)>,
570    pub scheduled_at: DateTime<Utc>,
571    pub component_id: ComponentId,
572    pub metadata: ExecutionMetadata,
573    pub scheduled_by: Option<ExecutionId>,
574}
575
576impl From<CreateRequest> for ExecutionEventInner {
577    fn from(value: CreateRequest) -> Self {
578        Self::Created {
579            ffqn: value.ffqn,
580            params: value.params,
581            parent: value.parent,
582            scheduled_at: value.scheduled_at,
583            component_id: value.component_id,
584            metadata: value.metadata,
585            scheduled_by: value.scheduled_by,
586        }
587    }
588}
589
590#[async_trait]
591pub trait DbPool: Send + Sync {
592    fn connection(&self) -> Box<dyn DbConnection>;
593}
594
595#[async_trait]
596pub trait DbPoolCloseable {
597    async fn close(self);
598}
599
600#[derive(Clone, Debug)]
601pub struct AppendEventsToExecution {
602    pub execution_id: ExecutionId,
603    pub version: Version,
604    pub batch: Vec<AppendRequest>,
605}
606
607#[derive(Clone, Debug)]
608pub struct AppendResponseToExecution {
609    pub parent_execution_id: ExecutionId,
610    pub parent_response_event: JoinSetResponseEventOuter,
611}
612
613#[async_trait]
614pub trait DbExecutor: Send + Sync {
615    #[expect(clippy::too_many_arguments)]
616    async fn lock_pending(
617        &self,
618        batch_size: usize,
619        pending_at_or_sooner: DateTime<Utc>,
620        ffqns: Arc<[FunctionFqn]>,
621        created_at: DateTime<Utc>,
622        component_id: ComponentId,
623        executor_id: ExecutorId,
624        lock_expires_at: DateTime<Utc>,
625        run_id: RunId,
626        retry_config: ComponentRetryConfig,
627    ) -> Result<LockPendingResponse, DbErrorGeneric>;
628
629    /// Specialized locking for e.g. extending the lock by the original executor and run.
630    #[expect(clippy::too_many_arguments)]
631    async fn lock_one(
632        &self,
633        created_at: DateTime<Utc>,
634        component_id: ComponentId,
635        execution_id: &ExecutionId,
636        run_id: RunId,
637        version: Version,
638        executor_id: ExecutorId,
639        lock_expires_at: DateTime<Utc>,
640        retry_config: ComponentRetryConfig,
641    ) -> Result<LockedExecution, DbErrorWrite>;
642
643    /// Append a single event to an existing execution log.
644    /// The request cannot contain `ExecutionEventInner::Created`.
645    async fn append(
646        &self,
647        execution_id: ExecutionId,
648        version: Version,
649        req: AppendRequest,
650    ) -> Result<AppendResponse, DbErrorWrite>;
651
652    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
653    /// The batch cannot contain `ExecutionEventInner::Created`.
654    async fn append_batch_respond_to_parent(
655        &self,
656        events: AppendEventsToExecution,
657        response: AppendResponseToExecution,
658        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
659    ) -> Result<AppendBatchResponse, DbErrorWrite>;
660
661    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
662    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
663    /// Otherwise wait until `timeout_fut` resolves.
664    /// Timers that expire between `pending_at_or_sooner` and timeout can be disregarded.
665    async fn wait_for_pending(
666        &self,
667        pending_at_or_sooner: DateTime<Utc>,
668        ffqns: Arc<[FunctionFqn]>,
669        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
670    );
671}
672
673#[async_trait]
674pub trait DbConnection: DbExecutor {
675    async fn append_response(
676        &self,
677        created_at: DateTime<Utc>,
678        execution_id: ExecutionId,
679        response_event: JoinSetResponseEvent,
680    ) -> Result<(), DbErrorWrite>;
681
682    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
683    /// The batch cannot contain `ExecutionEventInner::Created`.
684    async fn append_batch(
685        &self,
686        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
687        batch: Vec<AppendRequest>,
688        execution_id: ExecutionId,
689        version: Version,
690    ) -> Result<AppendBatchResponse, DbErrorWrite>;
691
692    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
693    /// The batch cannot contain `ExecutionEventInner::Created`.
694    async fn append_batch_create_new_execution(
695        &self,
696        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
697        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
698        execution_id: ExecutionId,
699        version: Version,
700        child_req: Vec<CreateRequest>,
701    ) -> Result<AppendBatchResponse, DbErrorWrite>;
702
703    #[cfg(feature = "test")]
704    /// Get execution log.
705    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
706
707    async fn list_execution_events(
708        &self,
709        execution_id: &ExecutionId,
710        since: &Version,
711        max_length: VersionType,
712        include_backtrace_id: bool,
713    ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
714
715    /// Get a single event without `backtrace_id`
716    async fn get_execution_event(
717        &self,
718        execution_id: &ExecutionId,
719        version: &Version,
720    ) -> Result<ExecutionEvent, DbErrorRead>;
721
722    async fn get_create_request(
723        &self,
724        execution_id: &ExecutionId,
725    ) -> Result<CreateRequest, DbErrorRead> {
726        let execution_event = self
727            .get_execution_event(execution_id, &Version::new(0))
728            .await?;
729        if let ExecutionEventInner::Created {
730            ffqn,
731            params,
732            parent,
733            scheduled_at,
734            component_id,
735            metadata,
736            scheduled_by,
737        } = execution_event.event
738        {
739            Ok(CreateRequest {
740                created_at: execution_event.created_at,
741                execution_id: execution_id.clone(),
742                ffqn,
743                params,
744                parent,
745                scheduled_at,
746                component_id,
747                metadata,
748                scheduled_by,
749            })
750        } else {
751            error!(%execution_id, "Execution log must start with creation");
752            Err(DbErrorRead::DbErrorGeneric(DbErrorGeneric::Uncategorized(
753                "execution log must start with creation".into(),
754            )))
755        }
756    }
757
758    async fn get_finished_result(
759        &self,
760        execution_id: &ExecutionId,
761        finished: PendingStateFinished,
762    ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
763        let last_event = self
764            .get_execution_event(execution_id, &Version::new(finished.version))
765            .await?;
766        if let ExecutionEventInner::Finished { result, .. } = last_event.event {
767            Ok(Some(result))
768        } else {
769            Ok(None)
770        }
771    }
772
773    async fn get_pending_state(
774        &self,
775        execution_id: &ExecutionId,
776    ) -> Result<PendingState, DbErrorRead>;
777
778    /// Get currently expired locks and async timers (delay requests)
779    async fn get_expired_timers(
780        &self,
781        at: DateTime<Utc>,
782    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
783
784    /// Create a new execution log
785    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
786
787    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
788    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
789    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
790    /// Implementations with no pubsub support should use polling.
791    async fn subscribe_to_next_responses(
792        &self,
793        execution_id: &ExecutionId,
794        start_idx: usize,
795        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
796    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
797
798    /// Notification mechainism with no strict guarantees for getting the finished result.
799    /// Implementations with no pubsub support should use polling.
800    async fn wait_for_finished_result(
801        &self,
802        execution_id: &ExecutionId,
803        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
804    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
805
806    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
807
808    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
809
810    /// Used by gRPC only.
811    /// Get the latest backtrace if version is not set.
812    async fn get_backtrace(
813        &self,
814        execution_id: &ExecutionId,
815        filter: BacktraceFilter,
816    ) -> Result<BacktraceInfo, DbErrorRead>;
817
818    /// Returns executions sorted in descending order.
819    /// Used by gRPC only.
820    async fn list_executions(
821        &self,
822        ffqn: Option<FunctionFqn>,
823        top_level_only: bool,
824        pagination: ExecutionListPagination,
825    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
826
827    /// Returns responses of an execution ordered as they arrived,
828    /// enabling matching each `JoinNext` to its corresponding response.
829    /// Used by gRPC only.
830    async fn list_responses(
831        &self,
832        execution_id: &ExecutionId,
833        pagination: Pagination<u32>,
834    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
835}
836
837#[derive(Clone)]
838pub enum BacktraceFilter {
839    First,
840    Last,
841    Specific(Version),
842}
843
844pub struct BacktraceInfo {
845    pub execution_id: ExecutionId,
846    pub component_id: ComponentId,
847    pub version_min_including: Version,
848    pub version_max_excluding: Version,
849    pub wasm_backtrace: WasmBacktrace,
850}
851
852#[derive(Serialize, Deserialize, Debug, Clone)]
853pub struct WasmBacktrace {
854    pub frames: Vec<FrameInfo>,
855}
856
857#[derive(Serialize, Deserialize, Debug, Clone)]
858pub struct FrameInfo {
859    pub module: String,
860    pub func_name: String,
861    pub symbols: Vec<FrameSymbol>,
862}
863
864#[derive(Serialize, Deserialize, Debug, Clone)]
865pub struct FrameSymbol {
866    pub func_name: Option<String>,
867    pub file: Option<String>,
868    pub line: Option<u32>,
869    pub col: Option<u32>,
870}
871
872mod wasm_backtrace {
873    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
874
875    impl WasmBacktrace {
876        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
877            if backtrace.frames().is_empty() {
878                None
879            } else {
880                Some(Self {
881                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
882                })
883            }
884        }
885    }
886
887    impl From<&wasmtime::FrameInfo> for FrameInfo {
888        fn from(frame: &wasmtime::FrameInfo) -> Self {
889            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
890            let mut func_name = String::new();
891            wasmtime_environ::demangle_function_name_or_index(
892                &mut func_name,
893                frame.func_name(),
894                frame.func_index() as usize,
895            )
896            .expect("writing to string must succeed");
897            Self {
898                module: module_name,
899                func_name,
900                symbols: frame
901                    .symbols()
902                    .iter()
903                    .map(std::convert::Into::into)
904                    .collect(),
905            }
906        }
907    }
908
909    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
910        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
911            let func_name = symbol.name().map(|name| {
912                let mut writer = String::new();
913                wasmtime_environ::demangle_function_name(&mut writer, name)
914                    .expect("writing to string must succeed");
915                writer
916            });
917
918            Self {
919                func_name,
920                file: symbol.file().map(ToString::to_string),
921                line: symbol.line(),
922                col: symbol.column(),
923            }
924        }
925    }
926}
927
928pub type ResponseCursorType = u32; // FIXME: Switch to u64
929
930#[derive(Debug, Clone)]
931pub struct ResponseWithCursor {
932    pub event: JoinSetResponseEventOuter,
933    pub cursor: ResponseCursorType,
934}
935
936#[derive(Debug)]
937pub struct ExecutionWithState {
938    pub execution_id: ExecutionId,
939    pub ffqn: FunctionFqn,
940    pub pending_state: PendingState,
941    pub created_at: DateTime<Utc>,
942    pub scheduled_at: DateTime<Utc>,
943}
944
945pub enum ExecutionListPagination {
946    CreatedBy(Pagination<Option<DateTime<Utc>>>),
947    ExecutionId(Pagination<Option<ExecutionId>>),
948}
949
950#[derive(Debug, Clone, Copy)]
951pub enum Pagination<T> {
952    NewerThan {
953        length: ResponseCursorType,
954        cursor: T,
955        including_cursor: bool,
956    },
957    OlderThan {
958        length: ResponseCursorType,
959        cursor: T,
960        including_cursor: bool,
961    },
962}
963impl<T> Pagination<T> {
964    pub fn length(&self) -> ResponseCursorType {
965        match self {
966            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
967        }
968    }
969    pub fn rel(&self) -> &'static str {
970        match self {
971            Pagination::NewerThan {
972                including_cursor: false,
973                ..
974            } => ">",
975            Pagination::NewerThan {
976                including_cursor: true,
977                ..
978            } => ">=",
979            Pagination::OlderThan {
980                including_cursor: false,
981                ..
982            } => "<",
983            Pagination::OlderThan {
984                including_cursor: true,
985                ..
986            } => "<=",
987        }
988    }
989    pub fn is_desc(&self) -> bool {
990        matches!(self, Pagination::OlderThan { .. })
991    }
992}
993
994#[cfg(feature = "test")]
995pub async fn wait_for_pending_state_fn<T: Debug>(
996    db_connection: &dyn DbConnection,
997    execution_id: &ExecutionId,
998    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
999    timeout: Option<Duration>,
1000) -> Result<T, DbErrorReadWithTimeout> {
1001    tracing::trace!(%execution_id, "Waiting for predicate");
1002    let fut = async move {
1003        loop {
1004            let execution_log = db_connection.get(execution_id).await?;
1005            if let Some(t) = predicate(execution_log) {
1006                tracing::debug!(%execution_id, "Found: {t:?}");
1007                return Ok(t);
1008            }
1009            tokio::time::sleep(Duration::from_millis(10)).await;
1010        }
1011    };
1012
1013    if let Some(timeout) = timeout {
1014        tokio::select! { // future's liveness: Dropping the loser immediately.
1015            res = fut => res,
1016            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1017        }
1018    } else {
1019        fut.await
1020    }
1021}
1022
1023#[derive(Debug, Clone, PartialEq, Eq)]
1024pub enum ExpiredTimer {
1025    Lock(ExpiredLock),
1026    Delay(ExpiredDelay),
1027}
1028
1029#[derive(Debug, Clone, PartialEq, Eq)]
1030pub struct ExpiredLock {
1031    pub execution_id: ExecutionId,
1032    // Version of last `Locked` event, used to detect whether the execution made progress.
1033    pub locked_at_version: Version,
1034    pub next_version: Version,
1035    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1036    pub intermittent_event_count: u32,
1037    pub max_retries: u32,
1038    pub retry_exp_backoff: Duration,
1039}
1040
1041#[derive(Debug, Clone, PartialEq, Eq)]
1042pub struct ExpiredDelay {
1043    pub execution_id: ExecutionId,
1044    pub join_set_id: JoinSetId,
1045    pub delay_id: DelayId,
1046}
1047
1048#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1049#[serde(tag = "type")]
1050pub enum PendingState {
1051    #[display("Locked(`{lock_expires_at}`, {executor_id}, {run_id})")]
1052    Locked {
1053        executor_id: ExecutorId,
1054        run_id: RunId,
1055        lock_expires_at: DateTime<Utc>,
1056    },
1057    #[display("PendingAt(`{scheduled_at}`)")]
1058    PendingAt { scheduled_at: DateTime<Utc> }, // e.g. created with a schedule, temporary timeout/failure
1059    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1060    /// Caused by [`HistoryEvent::JoinNext`]
1061    BlockedByJoinSet {
1062        join_set_id: JoinSetId,
1063        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1064        lock_expires_at: DateTime<Utc>,
1065        /// Blocked by closing of the join set
1066        closing: bool,
1067    },
1068    #[display("Finished({finished})")]
1069    Finished { finished: PendingStateFinished },
1070}
1071
1072#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1073#[display("{result_kind}")]
1074pub struct PendingStateFinished {
1075    pub version: VersionType, // not Version since it must be Copy
1076    pub finished_at: DateTime<Utc>,
1077    pub result_kind: PendingStateFinishedResultKind,
1078}
1079
1080#[derive(
1081    Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1082)]
1083pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1084const OK_VARIANT: &str = "ok";
1085impl FromStr for PendingStateFinishedResultKind {
1086    type Err = strum::ParseError;
1087
1088    fn from_str(s: &str) -> Result<Self, Self::Err> {
1089        if s == OK_VARIANT {
1090            Ok(PendingStateFinishedResultKind(Ok(())))
1091        } else {
1092            let err = PendingStateFinishedError::from_str(s)?;
1093            Ok(PendingStateFinishedResultKind(Err(err)))
1094        }
1095    }
1096}
1097
1098impl Display for PendingStateFinishedResultKind {
1099    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1100        match self.0 {
1101            Ok(()) => write!(f, "{OK_VARIANT}"),
1102            Err(err) => write!(f, "{err}"),
1103        }
1104    }
1105}
1106
1107impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1108    fn from(result: &SupportedFunctionReturnValue) -> Self {
1109        result.as_pending_state_finished_result()
1110    }
1111}
1112
1113#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1114#[cfg_attr(test, derive(strum::VariantArray))]
1115#[strum(serialize_all = "snake_case")]
1116pub enum PendingStateFinishedError {
1117    Timeout,
1118    ExecutionFailure,
1119    FallibleError,
1120}
1121
1122impl PendingState {
1123    pub fn can_append_lock(
1124        &self,
1125        created_at: DateTime<Utc>,
1126        executor_id: ExecutorId,
1127        run_id: RunId,
1128        lock_expires_at: DateTime<Utc>,
1129    ) -> Result<LockKind, DbErrorWritePermanent> {
1130        if lock_expires_at <= created_at {
1131            return Err(DbErrorWritePermanent::ValidationFailed(
1132                "invalid expiry date".into(),
1133            ));
1134        }
1135        match self {
1136            PendingState::PendingAt { scheduled_at } => {
1137                if *scheduled_at <= created_at {
1138                    // pending now, ok to lock
1139                    Ok(LockKind::CreatingNewLock)
1140                } else {
1141                    Err(DbErrorWritePermanent::ValidationFailed(
1142                        "cannot lock, not yet pending".into(),
1143                    ))
1144                }
1145            }
1146            PendingState::Locked {
1147                executor_id: current_pending_state_executor_id,
1148                run_id: current_pending_state_run_id,
1149                ..
1150            } => {
1151                if executor_id == *current_pending_state_executor_id
1152                    && run_id == *current_pending_state_run_id
1153                {
1154                    // Original executor is extending the lock.
1155                    Ok(LockKind::Extending)
1156                } else {
1157                    Err(DbErrorWritePermanent::CannotWrite {
1158                        reason: "cannot lock, already locked".into(),
1159                        expected_version: None,
1160                    })
1161                }
1162            }
1163            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWritePermanent::CannotWrite {
1164                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1165                expected_version: None,
1166            }),
1167            PendingState::Finished { .. } => Err(DbErrorWritePermanent::CannotWrite {
1168                reason: "already finished".into(),
1169                expected_version: None,
1170            }),
1171        }
1172    }
1173
1174    #[must_use]
1175    pub fn is_finished(&self) -> bool {
1176        matches!(self, PendingState::Finished { .. })
1177    }
1178}
1179
1180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1181pub enum LockKind {
1182    Extending,
1183    CreatingNewLock,
1184}
1185
1186pub mod http_client_trace {
1187    use chrono::{DateTime, Utc};
1188    use serde::{Deserialize, Serialize};
1189
1190    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1191    pub struct HttpClientTrace {
1192        pub req: RequestTrace,
1193        pub resp: Option<ResponseTrace>,
1194    }
1195
1196    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1197    pub struct RequestTrace {
1198        pub sent_at: DateTime<Utc>,
1199        pub uri: String,
1200        pub method: String,
1201    }
1202
1203    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1204    pub struct ResponseTrace {
1205        pub finished_at: DateTime<Utc>,
1206        pub status: Result<u16, String>,
1207    }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212    use super::HistoryEventScheduleAt;
1213    use super::PendingStateFinished;
1214    use super::PendingStateFinishedError;
1215    use super::PendingStateFinishedResultKind;
1216    use crate::SupportedFunctionReturnValue;
1217    use chrono::DateTime;
1218    use chrono::Datelike;
1219    use chrono::Utc;
1220    use insta::assert_snapshot;
1221    use rstest::rstest;
1222    use std::time::Duration;
1223    use strum::VariantArray as _;
1224    use val_json::type_wrapper::TypeWrapper;
1225    use val_json::wast_val::WastVal;
1226    use val_json::wast_val::WastValWithType;
1227
1228    #[rstest(expected => [
1229        PendingStateFinishedResultKind(Result::Ok(())),
1230        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1231    ])]
1232    #[test]
1233    fn serde_pending_state_finished_result_kind_should_work(
1234        expected: PendingStateFinishedResultKind,
1235    ) {
1236        let ser = serde_json::to_string(&expected).unwrap();
1237        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1238        assert_eq!(expected, actual);
1239    }
1240
1241    #[rstest(result_kind => [
1242        PendingStateFinishedResultKind(Result::Ok(())),
1243        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1244    ])]
1245    #[test]
1246    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1247        let expected = PendingStateFinished {
1248            version: 0,
1249            finished_at: Utc::now(),
1250            result_kind,
1251        };
1252
1253        let ser = serde_json::to_string(&expected).unwrap();
1254        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1255        assert_eq!(expected, actual);
1256    }
1257
1258    #[test]
1259    fn join_set_deser_with_result_ok_option_none_should_work() {
1260        let expected = SupportedFunctionReturnValue::Ok {
1261            ok: Some(WastValWithType {
1262                r#type: TypeWrapper::Result {
1263                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1264                    err: Some(Box::new(TypeWrapper::String)),
1265                },
1266                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1267            }),
1268        };
1269        let json = serde_json::to_string(&expected).unwrap();
1270        assert_snapshot!(json);
1271
1272        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1273
1274        assert_eq!(expected, actual);
1275    }
1276
1277    #[test]
1278    fn verify_pending_state_finished_result_kind_serde() {
1279        let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1280            .iter()
1281            .map(|var| PendingStateFinishedResultKind(Err(*var)))
1282            .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1283            .collect();
1284        let ser = serde_json::to_string_pretty(&variants).unwrap();
1285        insta::assert_snapshot!(ser);
1286        let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1287        assert_eq!(variants, deser);
1288    }
1289
1290    #[test]
1291    fn as_date_time_should_work_with_duration_u32_max_secs() {
1292        let duration = Duration::from_secs(u64::from(u32::MAX));
1293        let schedule_at = HistoryEventScheduleAt::In(duration);
1294        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1295        assert_eq!(2106, resolved.year());
1296    }
1297
1298    const MILLIS_PER_SEC: i64 = 1000;
1299    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1300
1301    #[test]
1302    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1303        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1304        let duration = Duration::from_secs(
1305            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1306        );
1307        let schedule_at = HistoryEventScheduleAt::In(duration);
1308        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1309    }
1310}