nominal-streaming 0.8.0

Library for durable, low-latency streaming into Nominal Core
Documentation
use std::fmt::Debug;
use std::io::Write;
use std::sync::Arc;
use std::sync::LazyLock;

use conjure_error::Error;
use conjure_http::client::AsyncClient;
use conjure_http::client::AsyncRequestBody;
use conjure_http::client::AsyncService;
use conjure_http::client::ConjureRuntime;
use conjure_http::private::header::CONTENT_ENCODING;
use conjure_http::private::header::CONTENT_TYPE;
use conjure_http::private::Request;
use conjure_http::private::Response;
use conjure_object::BearerToken;
use conjure_object::ResourceIdentifier;
use conjure_runtime_rustls_platform_verifier::Agent;
use conjure_runtime_rustls_platform_verifier::BodyWriter;
use conjure_runtime_rustls_platform_verifier::Client;
use conjure_runtime_rustls_platform_verifier::Idempotency;
use conjure_runtime_rustls_platform_verifier::ResponseBody;
use conjure_runtime_rustls_platform_verifier::UserAgent;
use nominal_api::clients::ingest::api::AsyncIngestServiceClient;
use nominal_api::clients::upload::api::AsyncUploadServiceClient;
use nominal_api::objects::api::rids::NominalDataSourceOrDatasetRid;
use nominal_api::objects::api::rids::WorkspaceRid;
use snap::write::FrameEncoder;
use url::Url;

use crate::types::AuthProvider;

pub mod conjure {
    pub use conjure_error as error;
    pub use conjure_http as http;
    pub use conjure_object as object;
    pub use conjure_runtime_rustls_platform_verifier as runtime;
}

/// The URL that points toward's Nominal's default production deployment.
pub const PRODUCTION_API_URL: &str = "https://api.gov.nominal.io/api";

const USER_AGENT: &str = "nominal-streaming";

impl AuthProvider for BearerToken {
    fn token(&self) -> Option<BearerToken> {
        Some(self.clone())
    }
}

#[derive(Debug, Clone)]
pub struct TokenAndWorkspaceRid {
    pub token: BearerToken,
    pub workspace_rid: Option<WorkspaceRid>,
}

impl AuthProvider for TokenAndWorkspaceRid {
    fn token(&self) -> Option<BearerToken> {
        Some(self.token.clone())
    }

    fn workspace_rid(&self) -> Option<WorkspaceRid> {
        self.workspace_rid.clone()
    }
}

#[derive(Clone)]
pub struct NominalApiClients {
    pub streaming: Client,
    pub upload: AsyncUploadServiceClient<Client>,
    pub ingest: AsyncIngestServiceClient<Client>,
}

impl Debug for NominalApiClients {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("NominalApiClients")
            .field("streaming", &"Client")
            .field("upload", &"UploadServiceAsyncClient<Client>")
            .field("ingest", &"IngestServiceAsyncClient<Client>")
            .finish()
    }
}

impl NominalApiClients {
    pub fn from_uri(base_uri: &str) -> Self {
        let base_uri = base_uri.parse::<url::Url>().unwrap();
        let streaming = async_conjure_streaming_client(base_uri.clone())
            .expect("Failed to create streaming client");
        let services = async_conjure_client("upload-ingest", base_uri)
            .expect("Failed to create upload/ingest client");
        Self::from_conjure_clients(streaming, services, &Arc::new(ConjureRuntime::default()))
    }

    /// NOTE: the conjure client type is a shared handle, and cheap to clone.
    pub fn from_conjure_clients(
        streaming: Client,
        services: Client,
        runtime: &Arc<ConjureRuntime>,
    ) -> Self {
        Self {
            streaming,
            upload: AsyncUploadServiceClient::new(services.clone(), runtime),
            ingest: AsyncIngestServiceClient::new(services, runtime),
        }
    }

    pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
        self.streaming.send(req).await
    }
}

pub static PRODUCTION_CLIENTS: LazyLock<NominalApiClients> =
    LazyLock::new(|| NominalApiClients::from_uri(PRODUCTION_API_URL));

pub fn async_conjure_streaming_client(uri: Url) -> Result<Client, Error> {
    Client::builder()
        .service("core-streaming-rs")
        .user_agent(UserAgent::new(Agent::new(
            USER_AGENT,
            env!("CARGO_PKG_VERSION"),
        )))
        .uri(uri)
        .connect_timeout(std::time::Duration::from_secs(1))
        .read_timeout(std::time::Duration::from_secs(2))
        .write_timeout(std::time::Duration::from_secs(2))
        .backoff_slot_size(std::time::Duration::from_millis(10))
        .max_num_retries(2)
        // enables retries for POST endpoints like the streaming ingest one
        .idempotency(Idempotency::Always)
        .build()
}

pub fn async_conjure_client(service: &'static str, uri: Url) -> Result<Client, Error> {
    Client::builder()
        .service(service)
        .user_agent(UserAgent::new(Agent::new(
            USER_AGENT,
            env!("CARGO_PKG_VERSION"),
        )))
        .uri(uri)
        .build()
}

pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;

pub fn encode_request<'a, 'b>(
    write_request_bytes: Vec<u8>,
    api_key: &'a BearerToken,
    data_source_rid: &'a ResourceIdentifier,
) -> std::io::Result<WriteRequest<'b>> {
    let mut encoder = FrameEncoder::new(Vec::with_capacity(write_request_bytes.len()));

    encoder.write_all(&write_request_bytes)?;

    let mut request = Request::new(AsyncRequestBody::Fixed(
        encoder.into_inner().unwrap().into(),
    ));

    let headers = request.headers_mut();
    headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
    headers.insert(CONTENT_ENCODING, "x-snappy-framed".parse().unwrap());

    *request.method_mut() = conjure_http::private::http::Method::POST;
    let mut path = conjure_http::private::UriBuilder::new();
    path.push_literal("/storage/writer/v1/nominal");

    let nominal_data_source_or_dataset_rid = NominalDataSourceOrDatasetRid(data_source_rid.clone());
    path.push_path_parameter(&nominal_data_source_or_dataset_rid);

    *request.uri_mut() = path.build();
    conjure_http::private::encode_header_auth(&mut request, api_key);
    request
        .extensions_mut()
        .insert(conjure_http::client::Endpoint::new(
            "NominalChannelWriterService",
            None,
            "writeNominalBatches",
            "/storage/writer/v1/nominal/{dataSourceRid}",
        ));
    Ok(request)
}