Skip to main content

nominal_streaming/
client.rs

1use std::fmt::Debug;
2use std::io::Write;
3use std::sync::Arc;
4use std::sync::LazyLock;
5
6use conjure_error::Error;
7use conjure_http::client::AsyncClient;
8use conjure_http::client::AsyncRequestBody;
9use conjure_http::client::AsyncService;
10use conjure_http::client::ConjureRuntime;
11use conjure_http::private::header::CONTENT_ENCODING;
12use conjure_http::private::header::CONTENT_TYPE;
13use conjure_http::private::Request;
14use conjure_http::private::Response;
15use conjure_object::BearerToken;
16use conjure_object::ResourceIdentifier;
17use conjure_runtime_rustls_platform_verifier::Agent;
18use conjure_runtime_rustls_platform_verifier::BodyWriter;
19use conjure_runtime_rustls_platform_verifier::Client;
20use conjure_runtime_rustls_platform_verifier::Idempotency;
21use conjure_runtime_rustls_platform_verifier::ResponseBody;
22use conjure_runtime_rustls_platform_verifier::UserAgent;
23use nominal_api::clients::ingest::api::AsyncIngestServiceClient;
24use nominal_api::clients::upload::api::AsyncUploadServiceClient;
25use nominal_api::objects::api::rids::NominalDataSourceOrDatasetRid;
26use nominal_api::objects::api::rids::WorkspaceRid;
27use snap::write::FrameEncoder;
28use url::Url;
29
30use crate::types::AuthProvider;
31
32pub mod conjure {
33    pub use conjure_error as error;
34    pub use conjure_http as http;
35    pub use conjure_object as object;
36    pub use conjure_runtime_rustls_platform_verifier as runtime;
37}
38
39/// The URL that points toward's Nominal's default production deployment.
40pub const PRODUCTION_API_URL: &str = "https://api.gov.nominal.io/api";
41
42const USER_AGENT: &str = "nominal-streaming";
43
44impl AuthProvider for BearerToken {
45    fn token(&self) -> Option<BearerToken> {
46        Some(self.clone())
47    }
48}
49
50#[derive(Debug, Clone)]
51pub struct TokenAndWorkspaceRid {
52    pub token: BearerToken,
53    pub workspace_rid: Option<WorkspaceRid>,
54}
55
56impl AuthProvider for TokenAndWorkspaceRid {
57    fn token(&self) -> Option<BearerToken> {
58        Some(self.token.clone())
59    }
60
61    fn workspace_rid(&self) -> Option<WorkspaceRid> {
62        self.workspace_rid.clone()
63    }
64}
65
66#[derive(Clone)]
67pub struct NominalApiClients {
68    pub streaming: Client,
69    pub upload: AsyncUploadServiceClient<Client>,
70    pub ingest: AsyncIngestServiceClient<Client>,
71}
72
73impl Debug for NominalApiClients {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("NominalApiClients")
76            .field("streaming", &"Client")
77            .field("upload", &"UploadServiceAsyncClient<Client>")
78            .field("ingest", &"IngestServiceAsyncClient<Client>")
79            .finish()
80    }
81}
82
83impl NominalApiClients {
84    pub fn from_uri(base_uri: &str) -> Self {
85        let base_uri = base_uri.parse::<url::Url>().unwrap();
86        let streaming = async_conjure_streaming_client(base_uri.clone())
87            .expect("Failed to create streaming client");
88        let services = async_conjure_client("upload-ingest", base_uri)
89            .expect("Failed to create upload/ingest client");
90        Self::from_conjure_clients(streaming, services, &Arc::new(ConjureRuntime::default()))
91    }
92
93    /// NOTE: the conjure client type is a shared handle, and cheap to clone.
94    pub fn from_conjure_clients(
95        streaming: Client,
96        services: Client,
97        runtime: &Arc<ConjureRuntime>,
98    ) -> Self {
99        Self {
100            streaming,
101            upload: AsyncUploadServiceClient::new(services.clone(), runtime),
102            ingest: AsyncIngestServiceClient::new(services, runtime),
103        }
104    }
105
106    pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
107        self.streaming.send(req).await
108    }
109}
110
111pub static PRODUCTION_CLIENTS: LazyLock<NominalApiClients> =
112    LazyLock::new(|| NominalApiClients::from_uri(PRODUCTION_API_URL));
113
114pub fn async_conjure_streaming_client(uri: Url) -> Result<Client, Error> {
115    Client::builder()
116        .service("core-streaming-rs")
117        .user_agent(UserAgent::new(Agent::new(
118            USER_AGENT,
119            env!("CARGO_PKG_VERSION"),
120        )))
121        .uri(uri)
122        .connect_timeout(std::time::Duration::from_secs(1))
123        .read_timeout(std::time::Duration::from_secs(2))
124        .write_timeout(std::time::Duration::from_secs(2))
125        .backoff_slot_size(std::time::Duration::from_millis(10))
126        .max_num_retries(2)
127        // enables retries for POST endpoints like the streaming ingest one
128        .idempotency(Idempotency::Always)
129        .build()
130}
131
132pub fn async_conjure_client(service: &'static str, uri: Url) -> Result<Client, Error> {
133    Client::builder()
134        .service(service)
135        .user_agent(UserAgent::new(Agent::new(
136            USER_AGENT,
137            env!("CARGO_PKG_VERSION"),
138        )))
139        .uri(uri)
140        .build()
141}
142
143pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;
144
145pub fn encode_request<'a, 'b>(
146    write_request_bytes: Vec<u8>,
147    api_key: &'a BearerToken,
148    data_source_rid: &'a ResourceIdentifier,
149) -> std::io::Result<WriteRequest<'b>> {
150    let mut encoder = FrameEncoder::new(Vec::with_capacity(write_request_bytes.len()));
151
152    encoder.write_all(&write_request_bytes)?;
153
154    let mut request = Request::new(AsyncRequestBody::Fixed(
155        encoder.into_inner().unwrap().into(),
156    ));
157
158    let headers = request.headers_mut();
159    headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
160    headers.insert(CONTENT_ENCODING, "x-snappy-framed".parse().unwrap());
161
162    *request.method_mut() = conjure_http::private::http::Method::POST;
163    let mut path = conjure_http::private::UriBuilder::new();
164    path.push_literal("/storage/writer/v1/nominal");
165
166    let nominal_data_source_or_dataset_rid = NominalDataSourceOrDatasetRid(data_source_rid.clone());
167    path.push_path_parameter(&nominal_data_source_or_dataset_rid);
168
169    *request.uri_mut() = path.build();
170    conjure_http::private::encode_header_auth(&mut request, api_key);
171    request
172        .extensions_mut()
173        .insert(conjure_http::client::Endpoint::new(
174            "NominalChannelWriterService",
175            None,
176            "writeNominalBatches",
177            "/storage/writer/v1/nominal/{dataSourceRid}",
178        ));
179    Ok(request)
180}