nominal_streaming/
client.rs

1use std::fmt::Debug;
2use std::io::Write;
3use std::sync::LazyLock;
4
5use conjure_error::Error;
6use conjure_http::client::AsyncClient;
7use conjure_http::client::AsyncRequestBody;
8use conjure_http::client::AsyncService;
9use conjure_http::private::header::CONTENT_ENCODING;
10use conjure_http::private::header::CONTENT_TYPE;
11use conjure_http::private::Request;
12use conjure_http::private::Response;
13use conjure_object::BearerToken;
14use conjure_object::ResourceIdentifier;
15use conjure_runtime::Agent;
16use conjure_runtime::BodyWriter;
17use conjure_runtime::Client;
18use conjure_runtime::Idempotency;
19use conjure_runtime::ResponseBody;
20use conjure_runtime::UserAgent;
21use nominal_api::api::rids::NominalDataSourceOrDatasetRid;
22use nominal_api::api::rids::WorkspaceRid;
23use nominal_api::ingest::api::IngestServiceAsyncClient;
24use nominal_api::upload::api::UploadServiceAsyncClient;
25use snap::write::FrameEncoder;
26use url::Url;
27
28use crate::types::AuthProvider;
29
30pub mod conjure {
31    pub use conjure_error as error;
32    pub use conjure_http as http;
33    pub use conjure_object as object;
34    pub use conjure_runtime as runtime;
35}
36
37pub const PRODUCTION_API_URL: &str = "https://api.gov.nominal.io/api";
38const STAGING_API_URL: &str = "https://api-staging.gov.nominal.io/api";
39const USER_AGENT: &str = "nominal-streaming";
40
41impl AuthProvider for BearerToken {
42    fn token(&self) -> Option<BearerToken> {
43        Some(self.clone())
44    }
45}
46
47#[derive(Debug, Clone)]
48pub struct TokenAndWorkspaceRid {
49    pub token: BearerToken,
50    pub workspace_rid: Option<WorkspaceRid>,
51}
52
53impl AuthProvider for TokenAndWorkspaceRid {
54    fn token(&self) -> Option<BearerToken> {
55        Some(self.token.clone())
56    }
57
58    fn workspace_rid(&self) -> Option<WorkspaceRid> {
59        self.workspace_rid.clone()
60    }
61}
62
63#[derive(Clone)]
64pub struct NominalApiClients {
65    pub streaming: Client,
66    pub upload: UploadServiceAsyncClient<Client>,
67    pub ingest: IngestServiceAsyncClient<Client>,
68}
69
70impl Debug for NominalApiClients {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("NominalApiClients")
73            .field("streaming", &"Client")
74            .field("upload", &"UploadServiceAsyncClient<Client>")
75            .field("ingest", &"IngestServiceAsyncClient<Client>")
76            .finish()
77    }
78}
79
80impl NominalApiClients {
81    pub fn from_uri(base_uri: &str) -> NominalApiClients {
82        NominalApiClients {
83            streaming: async_conjure_streaming_client(base_uri.try_into().unwrap())
84                .expect("Failed to create streaming client"),
85            upload: UploadServiceAsyncClient::new(
86                async_conjure_client("upload", base_uri.try_into().unwrap())
87                    .expect("Failed to create upload client"),
88            ),
89            ingest: IngestServiceAsyncClient::new(
90                async_conjure_client("ingest", base_uri.try_into().unwrap())
91                    .expect("Failed to create ingest client"),
92            ),
93        }
94    }
95
96    pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
97        self.streaming.send(req).await
98    }
99}
100
101pub static PRODUCTION_CLIENTS: LazyLock<NominalApiClients> =
102    LazyLock::new(|| NominalApiClients::from_uri(PRODUCTION_API_URL));
103
104pub static STAGING_CLIENTS: LazyLock<NominalApiClients> =
105    LazyLock::new(|| NominalApiClients::from_uri(STAGING_API_URL));
106
107fn async_conjure_streaming_client(uri: Url) -> Result<Client, Error> {
108    Client::builder()
109        .service("core-streaming-rs")
110        .user_agent(UserAgent::new(Agent::new(
111            USER_AGENT,
112            env!("CARGO_PKG_VERSION"),
113        )))
114        .uri(uri)
115        .connect_timeout(std::time::Duration::from_secs(1))
116        .read_timeout(std::time::Duration::from_secs(2))
117        .write_timeout(std::time::Duration::from_secs(2))
118        .backoff_slot_size(std::time::Duration::from_millis(10))
119        .max_num_retries(2)
120        // enables retries for POST endpoints like the streaming ingest one
121        .idempotency(Idempotency::Always)
122        .build()
123}
124
125fn async_conjure_client(service: &'static str, uri: Url) -> Result<Client, Error> {
126    Client::builder()
127        .service(service)
128        .user_agent(UserAgent::new(Agent::new(
129            USER_AGENT,
130            env!("CARGO_PKG_VERSION"),
131        )))
132        .uri(uri)
133        .build()
134}
135
136pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;
137
138pub fn encode_request<'a, 'b>(
139    write_request_bytes: Vec<u8>,
140    api_key: &'a BearerToken,
141    data_source_rid: &'a ResourceIdentifier,
142) -> std::io::Result<WriteRequest<'b>> {
143    let mut encoder = FrameEncoder::new(Vec::with_capacity(write_request_bytes.len()));
144
145    encoder.write_all(&write_request_bytes)?;
146
147    let mut request = Request::new(AsyncRequestBody::Fixed(
148        encoder.into_inner().unwrap().into(),
149    ));
150
151    let headers = request.headers_mut();
152    headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
153    headers.insert(CONTENT_ENCODING, "x-snappy-framed".parse().unwrap());
154
155    *request.method_mut() = conjure_http::private::http::Method::POST;
156    let mut path = conjure_http::private::UriBuilder::new();
157    path.push_literal("/storage/writer/v1/nominal");
158
159    let nominal_data_source_or_dataset_rid = NominalDataSourceOrDatasetRid(data_source_rid.clone());
160    path.push_path_parameter(&nominal_data_source_or_dataset_rid);
161
162    *request.uri_mut() = path.build();
163    conjure_http::private::encode_header_auth(&mut request, api_key);
164    conjure_http::private::encode_empty_response_headers(&mut request);
165    request
166        .extensions_mut()
167        .insert(conjure_http::client::Endpoint::new(
168            "NominalChannelWriterService",
169            None,
170            "writeNominalBatches",
171            "/storage/writer/v1/nominal/{dataSourceRid}",
172        ));
173    Ok(request)
174}