feldera-types 0.275.0

Public API types for Feldera
Documentation
use crate::error::ErrorResponse;
use actix_web::body::BoxBody;
use actix_web::http::StatusCode;
use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder, Responder, ResponseError};
use bytemuck::NoUninit;
use clap::ValueEnum;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::Display;
use utoipa::ToSchema;

/// Runtime status of the pipeline.
///
/// Of the statuses, only `Unavailable` is determined by the runner. All other statuses are
/// determined by the pipeline and taken over by the runner.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, ToSchema, NoUninit)]
#[repr(u8)]
pub enum RuntimeStatus {
    /// The runner was unable to determine the pipeline runtime status. This status is never
    /// returned by the pipeline endpoint itself, but only determined by the runner.
    ///
    /// It can notably occur in two scenarios:
    /// 1. The runner is unable to (in time) receive a response for its sent request to the
    ///    pipeline `/status` endpoint, or it is unable to parse the response.
    /// 2. The runner received back a `503 Service Unavailable` as a response to the request.
    ///    This can occur for example if the pipeline is unable to acquire a lock necessary to
    ///    determine whether it is in any of the other runtime statuses.
    Unavailable,

    /// The pipeline is waiting for initialization instructions from the
    /// coordinator.
    Coordination,

    /// The pipeline is constantly pulling the latest checkpoint from S3 but not processing any inputs.
    Standby,

    /// The input and output connectors are establishing connections to their data sources and sinks
    /// respectively.
    Initializing,

    /// The pipeline was modified since the last checkpoint. User approval is required before
    /// bootstrapping can proceed.
    AwaitingApproval,

    /// The pipeline was modified since the last checkpoint, and is currently bootstrapping modified
    /// views.
    Bootstrapping,

    /// Input records that were stored in the journal but were not yet processed, are being
    /// processed first.
    Replaying,

    /// The input connectors are paused.
    Paused,

    /// The input connectors are running.
    Running,

    /// The pipeline finished checkpointing and pausing.
    Suspended,
}

impl From<RuntimeDesiredStatus> for RuntimeStatus {
    fn from(value: RuntimeDesiredStatus) -> Self {
        match value {
            RuntimeDesiredStatus::Unavailable => Self::Unavailable,
            RuntimeDesiredStatus::Coordination => Self::Coordination,
            RuntimeDesiredStatus::Standby => Self::Standby,
            RuntimeDesiredStatus::Paused => Self::Paused,
            RuntimeDesiredStatus::Running => Self::Running,
            RuntimeDesiredStatus::Suspended => Self::Suspended,
        }
    }
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema, ValueEnum)]
pub enum RuntimeDesiredStatus {
    Unavailable,
    Coordination,
    Standby,
    Paused,
    Running,
    Suspended,
}

impl RuntimeDesiredStatus {
    pub fn may_transition_to(&self, target: Self) -> bool {
        match (*self, target) {
            (old, new) if old == new => true,
            (Self::Standby, Self::Paused | Self::Running) => true,
            (Self::Paused, Self::Running | Self::Suspended) => true,
            (Self::Running, Self::Paused | Self::Suspended) => true,
            _ => false,
        }
    }

    pub fn may_transition_to_at_startup(&self, target: Self) -> bool {
        match (*self, target) {
            (_, Self::Coordination) => true,
            (Self::Suspended, _) => {
                // A suspended pipeline must transition to "paused" or
                // "running".
                matches!(target, Self::Paused | Self::Running)
            }
            (old, new) if old.may_transition_to(new) => true,
            _ => false,
        }
    }
}

/// Some of our JSON interface uses capitalized status names, like `Paused`, but
/// other parts use snake-case names, like `paused`.  To support the latter with
/// `serde`, use this module in the field declaration, e.g.:
///
/// ```ignore
/// #[serde(with = "feldera_types::runtime_status::snake_case_runtime_desired_status")]
/// ```
pub mod snake_case_runtime_desired_status {
    use serde::{Deserialize, Deserializer, Serialize, Serializer};

    use crate::runtime_status::RuntimeDesiredStatus;

    #[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
    #[serde(rename_all = "snake_case")]
    enum SnakeRuntimeDesiredStatus {
        Unavailable,
        Coordination,
        Standby,
        Paused,
        Running,
        Suspended,
    }

    impl From<RuntimeDesiredStatus> for SnakeRuntimeDesiredStatus {
        fn from(value: RuntimeDesiredStatus) -> Self {
            match value {
                RuntimeDesiredStatus::Unavailable => SnakeRuntimeDesiredStatus::Unavailable,
                RuntimeDesiredStatus::Coordination => SnakeRuntimeDesiredStatus::Coordination,
                RuntimeDesiredStatus::Standby => SnakeRuntimeDesiredStatus::Standby,
                RuntimeDesiredStatus::Paused => SnakeRuntimeDesiredStatus::Paused,
                RuntimeDesiredStatus::Running => SnakeRuntimeDesiredStatus::Running,
                RuntimeDesiredStatus::Suspended => SnakeRuntimeDesiredStatus::Suspended,
            }
        }
    }

    impl From<SnakeRuntimeDesiredStatus> for RuntimeDesiredStatus {
        fn from(value: SnakeRuntimeDesiredStatus) -> Self {
            match value {
                SnakeRuntimeDesiredStatus::Unavailable => RuntimeDesiredStatus::Unavailable,
                SnakeRuntimeDesiredStatus::Coordination => RuntimeDesiredStatus::Coordination,
                SnakeRuntimeDesiredStatus::Standby => RuntimeDesiredStatus::Standby,
                SnakeRuntimeDesiredStatus::Paused => RuntimeDesiredStatus::Paused,
                SnakeRuntimeDesiredStatus::Running => RuntimeDesiredStatus::Running,
                SnakeRuntimeDesiredStatus::Suspended => RuntimeDesiredStatus::Suspended,
            }
        }
    }

    pub fn serialize<S>(value: &RuntimeDesiredStatus, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        SnakeRuntimeDesiredStatus::from(*value).serialize(serializer)
    }

    pub fn deserialize<'de, D>(deserializer: D) -> Result<RuntimeDesiredStatus, D::Error>
    where
        D: Deserializer<'de>,
    {
        SnakeRuntimeDesiredStatus::deserialize(deserializer).map(|status| status.into())
    }
}

#[derive(
    Debug, Default, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema, NoUninit,
)]
#[repr(u8)]
#[serde(rename_all = "snake_case")]
pub enum BootstrapPolicy {
    Allow,
    Reject,
    #[default]
    AwaitApproval,
}

impl TryFrom<Option<String>> for BootstrapPolicy {
    type Error = ();

    fn try_from(value: Option<String>) -> Result<Self, Self::Error> {
        match value.as_deref() {
            Some("allow") => Ok(Self::Allow),
            Some("reject") => Ok(Self::Reject),
            Some("await_approval") | None => Ok(Self::AwaitApproval),
            _ => Err(()),
        }
    }
}

impl From<String> for BootstrapPolicy {
    fn from(value: String) -> Self {
        match value.as_str() {
            "allow" => Self::Allow,
            "reject" => Self::Reject,
            "await_approval" => Self::AwaitApproval,
            _ => panic!("Invalid 'bootstrap_policy' value: {value}"),
        }
    }
}

impl Display for BootstrapPolicy {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let s = match self {
            BootstrapPolicy::Allow => "allow",
            BootstrapPolicy::Reject => "reject",
            BootstrapPolicy::AwaitApproval => "await_approval",
        };
        write!(f, "{s}")
    }
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ExtendedRuntimeStatus {
    /// Runtime status of the pipeline.
    pub runtime_status: RuntimeStatus,

    /// Human-readable details about the runtime status. Its content can contain for instance an
    /// explanation why it is in this status and any other additional information about it (e.g.,
    /// progress).
    pub runtime_status_details: serde_json::Value,

    /// Runtime desired status of the pipeline.
    pub runtime_desired_status: RuntimeDesiredStatus,
}

impl Responder for ExtendedRuntimeStatus {
    type Body = BoxBody;

    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
        HttpResponseBuilder::new(StatusCode::OK).json(self)
    }
}

impl From<ExtendedRuntimeStatus> for HttpResponse<BoxBody> {
    fn from(value: ExtendedRuntimeStatus) -> Self {
        HttpResponseBuilder::new(StatusCode::OK).json(value)
    }
}

/// Error returned by the pipeline `/status` endpoint.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ExtendedRuntimeStatusError {
    /// Status code. Returning anything except `503 Service Unavailable` will cause the runner to
    /// forcefully stop the pipeline.
    #[serde(with = "status_code")]
    pub status_code: StatusCode,

    /// Error response.
    pub error: ErrorResponse,
}

mod status_code {
    use actix_web::http::StatusCode;
    use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error};

    pub fn serialize<S>(value: &StatusCode, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        value.as_u16().serialize(serializer)
    }

    pub fn deserialize<'de, D>(deserializer: D) -> Result<StatusCode, D::Error>
    where
        D: Deserializer<'de>,
    {
        let value = u16::deserialize(deserializer)?;
        StatusCode::from_u16(value).map_err(D::Error::custom)
    }
}

impl Display for ExtendedRuntimeStatusError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}: {:?}", self.status_code, self.error)
    }
}

impl ResponseError for ExtendedRuntimeStatusError {
    fn status_code(&self) -> StatusCode {
        self.status_code
    }

    fn error_response(&self) -> HttpResponse<BoxBody> {
        HttpResponseBuilder::new(self.status_code()).json(self.error.clone())
    }
}