Skip to main content

nominal_streaming/
client.rs

1use std::fmt::Debug;
2use std::io::Write;
3use std::sync::LazyLock;
4
5use conjure_error::Error;
6use conjure_http::client::AsyncClient;
7use conjure_http::client::AsyncRequestBody;
8use conjure_http::client::AsyncService;
9use conjure_http::private::header::CONTENT_ENCODING;
10use conjure_http::private::header::CONTENT_TYPE;
11use conjure_http::private::Request;
12use conjure_http::private::Response;
13use conjure_object::BearerToken;
14use conjure_object::ResourceIdentifier;
15use conjure_runtime_rustls_platform_verifier::conjure_runtime::Agent;
16use conjure_runtime_rustls_platform_verifier::conjure_runtime::BodyWriter;
17use conjure_runtime_rustls_platform_verifier::conjure_runtime::Client;
18use conjure_runtime_rustls_platform_verifier::conjure_runtime::Idempotency;
19use conjure_runtime_rustls_platform_verifier::conjure_runtime::UserAgent;
20use conjure_runtime_rustls_platform_verifier::PlatformVerifierClient;
21use conjure_runtime_rustls_platform_verifier::ResponseBody;
22use nominal_api::api::rids::NominalDataSourceOrDatasetRid;
23use nominal_api::api::rids::WorkspaceRid;
24use nominal_api::ingest::api::IngestServiceAsyncClient;
25use nominal_api::upload::api::UploadServiceAsyncClient;
26use snap::write::FrameEncoder;
27use url::Url;
28
29use crate::types::AuthProvider;
30
31pub mod conjure {
32    pub use conjure_error as error;
33    pub use conjure_http as http;
34    pub use conjure_object as object;
35    pub use conjure_runtime_rustls_platform_verifier::conjure_runtime as runtime;
36}
37
38/// The URL that points toward's Nominal's default production deployment.
39pub const PRODUCTION_API_URL: &str = "https://api.gov.nominal.io/api";
40
41const USER_AGENT: &str = "nominal-streaming";
42
43impl AuthProvider for BearerToken {
44    fn token(&self) -> Option<BearerToken> {
45        Some(self.clone())
46    }
47}
48
49#[derive(Debug, Clone)]
50pub struct TokenAndWorkspaceRid {
51    pub token: BearerToken,
52    pub workspace_rid: Option<WorkspaceRid>,
53}
54
55impl AuthProvider for TokenAndWorkspaceRid {
56    fn token(&self) -> Option<BearerToken> {
57        Some(self.token.clone())
58    }
59
60    fn workspace_rid(&self) -> Option<WorkspaceRid> {
61        self.workspace_rid.clone()
62    }
63}
64
65#[derive(Clone)]
66pub struct NominalApiClients {
67    pub streaming: PlatformVerifierClient,
68    pub upload: UploadServiceAsyncClient<PlatformVerifierClient>,
69    pub ingest: IngestServiceAsyncClient<PlatformVerifierClient>,
70}
71
72impl Debug for NominalApiClients {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("NominalApiClients")
75            .field("streaming", &"Client")
76            .field("upload", &"UploadServiceAsyncClient<Client>")
77            .field("ingest", &"IngestServiceAsyncClient<Client>")
78            .finish()
79    }
80}
81
82impl NominalApiClients {
83    pub fn from_uri(base_uri: &str) -> Self {
84        let base_uri = base_uri.parse::<url::Url>().unwrap();
85        let streaming = async_conjure_streaming_client(base_uri.clone())
86            .expect("Failed to create streaming client");
87        let services = async_conjure_client("upload-ingest", base_uri)
88            .expect("Failed to create upload/ingest client");
89        Self::from_conjure_clients(streaming, services)
90    }
91
92    /// NOTE: the conjure client type is a shared handle, and cheap to clone.
93    pub fn from_conjure_clients(
94        streaming: PlatformVerifierClient,
95        services: PlatformVerifierClient,
96    ) -> Self {
97        Self {
98            streaming,
99            upload: UploadServiceAsyncClient::new(services.clone()),
100            ingest: IngestServiceAsyncClient::new(services),
101        }
102    }
103
104    pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
105        self.streaming.send(req).await
106    }
107}
108
109pub static PRODUCTION_CLIENTS: LazyLock<NominalApiClients> =
110    LazyLock::new(|| NominalApiClients::from_uri(PRODUCTION_API_URL));
111
112pub fn async_conjure_streaming_client(uri: Url) -> Result<PlatformVerifierClient, Error> {
113    Client::builder()
114        .service("core-streaming-rs")
115        .user_agent(UserAgent::new(Agent::new(
116            USER_AGENT,
117            env!("CARGO_PKG_VERSION"),
118        )))
119        .uri(uri)
120        .connect_timeout(std::time::Duration::from_secs(1))
121        .read_timeout(std::time::Duration::from_secs(2))
122        .write_timeout(std::time::Duration::from_secs(2))
123        .backoff_slot_size(std::time::Duration::from_millis(10))
124        .max_num_retries(2)
125        // enables retries for POST endpoints like the streaming ingest one
126        .idempotency(Idempotency::Always)
127        .raw_client_builder(conjure_runtime_rustls_platform_verifier::RawClientBuilder)
128        .build()
129}
130
131pub fn async_conjure_client(
132    service: &'static str,
133    uri: Url,
134) -> Result<PlatformVerifierClient, Error> {
135    Client::builder()
136        .service(service)
137        .user_agent(UserAgent::new(Agent::new(
138            USER_AGENT,
139            env!("CARGO_PKG_VERSION"),
140        )))
141        .uri(uri)
142        .raw_client_builder(conjure_runtime_rustls_platform_verifier::RawClientBuilder)
143        .build()
144}
145
146pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;
147
148pub fn encode_request<'a, 'b>(
149    write_request_bytes: Vec<u8>,
150    api_key: &'a BearerToken,
151    data_source_rid: &'a ResourceIdentifier,
152) -> std::io::Result<WriteRequest<'b>> {
153    let mut encoder = FrameEncoder::new(Vec::with_capacity(write_request_bytes.len()));
154
155    encoder.write_all(&write_request_bytes)?;
156
157    let mut request = Request::new(AsyncRequestBody::Fixed(
158        encoder.into_inner().unwrap().into(),
159    ));
160
161    let headers = request.headers_mut();
162    headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
163    headers.insert(CONTENT_ENCODING, "x-snappy-framed".parse().unwrap());
164
165    *request.method_mut() = conjure_http::private::http::Method::POST;
166    let mut path = conjure_http::private::UriBuilder::new();
167    path.push_literal("/storage/writer/v1/nominal");
168
169    let nominal_data_source_or_dataset_rid = NominalDataSourceOrDatasetRid(data_source_rid.clone());
170    path.push_path_parameter(&nominal_data_source_or_dataset_rid);
171
172    *request.uri_mut() = path.build();
173    conjure_http::private::encode_header_auth(&mut request, api_key);
174    conjure_http::private::encode_empty_response_headers(&mut request);
175    request
176        .extensions_mut()
177        .insert(conjure_http::client::Endpoint::new(
178            "NominalChannelWriterService",
179            None,
180            "writeNominalBatches",
181            "/storage/writer/v1/nominal/{dataSourceRid}",
182        ));
183    Ok(request)
184}