use std::marker::Unpin;
use std::time::Duration;
use anyhow::{anyhow, Context};
use async_nats::jetstream;
use async_nats::jetstream::object_store::{self, ObjectStore};
use tokio::io::{AsyncRead, AsyncReadExt};
use tracing::{debug, error, instrument};
pub const CHUNK_RPC_EXTRA_TIME: Duration = Duration::from_secs(13);
#[cfg(not(test))]
pub const CHUNK_THRESHOLD_BYTES: usize = 1024 * 900; #[cfg(test)]
pub const CHUNK_THRESHOLD_BYTES: usize = 1024; #[derive(Clone, Debug)]
pub struct ChunkEndpoint {
lattice: String,
js: jetstream::Context,
}
impl ChunkEndpoint {
#[must_use]
pub fn new(lattice: &str, js: jetstream::Context) -> Self {
ChunkEndpoint {
lattice: lattice.to_string(),
js,
}
}
pub fn with_client(
lattice: &str,
nc: async_nats::Client,
domain: Option<impl AsRef<str>>,
) -> Self {
let js = if let Some(domain) = domain {
jetstream::with_domain(nc, domain)
} else {
jetstream::new(nc)
};
ChunkEndpoint::new(lattice, js)
}
#[instrument(level = "trace", skip(self))]
pub async fn get_unchunkified(&self, inv_id: &str) -> anyhow::Result<Vec<u8>> {
let mut result = Vec::new();
let store = self
.create_or_reuse_store()
.await
.context("failed to get object store")?;
debug!(invocation_id = %inv_id, "chunkify starting to receive");
let mut obj = store
.get(inv_id)
.await
.context("failed to receive chunked stream")?;
obj.read_to_end(&mut result)
.await
.context("failed to read chunked stream")?;
if let Err(err) = store.delete(inv_id).await {
error!(invocation_id = %inv_id, %err, "failed to delete chunks");
}
Ok(result)
}
#[allow(clippy::missing_errors_doc)] pub async fn get_unchunkified_response(&self, inv_id: &str) -> anyhow::Result<Vec<u8>> {
self.get_unchunkified(&format!("{inv_id}-r")).await
}
#[instrument(level = "trace", skip(self, bytes))]
pub async fn chunkify(
&self,
inv_id: &str,
mut bytes: (impl AsyncRead + Unpin),
) -> anyhow::Result<()> {
let store = self.create_or_reuse_store().await?;
debug!(invocation_id = %inv_id, "chunkify starting to send");
let info = store
.put(inv_id, &mut bytes)
.await
.map_err(|e| anyhow!(e))
.context("error when writing chunkified data for {inv_id}")?;
debug!(?info, invocation_id = %inv_id, "chunkify completed writing");
Ok(())
}
#[allow(clippy::missing_errors_doc)] pub async fn chunkify_response(
&self,
inv_id: &str,
bytes: (impl AsyncRead + Unpin),
) -> anyhow::Result<()> {
self.chunkify(&format!("{inv_id}-r"), bytes).await
}
async fn create_or_reuse_store(&self) -> anyhow::Result<ObjectStore> {
let store = match self.js.get_object_store(&self.lattice).await {
Ok(store) => store,
Err(_) => self
.js
.create_object_store(object_store::Config {
bucket: self.lattice.clone(),
..Default::default()
})
.await
.map_err(|e| anyhow!(e))
.context("Failed to create chunking store")?,
};
Ok(store)
}
}