use crate::header::HeaderMap;
use crate::jetstream::{
DateTime, DiscardPolicy, JetStream, PushSubscription, StorageType, StreamConfig,
SubscribeOptions,
};
use crate::Message;
use base64::URL_SAFE;
use lazy_static::lazy_static;
use regex::Regex;
use ring::digest::SHA256;
use serde::{Deserialize, Serialize};
use std::cmp;
use std::io::{self, ErrorKind};
use std::time::Duration;
use time::serde::rfc3339;
use time::OffsetDateTime;
const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
const NATS_ROLLUP: &str = "Nats-Rollup";
const ROLLUP_SUBJECT: &str = "sub";
lazy_static! {
static ref BUCKET_NAME_RE: Regex = Regex::new(r#"\A[a-zA-Z0-9_-]+\z"#).unwrap();
static ref OBJECT_NAME_RE: Regex = Regex::new(r#"\A[-/_=\.a-zA-Z0-9]+\z"#).unwrap();
}
fn is_valid_bucket_name(bucket_name: &str) -> bool {
BUCKET_NAME_RE.is_match(bucket_name)
}
fn is_valid_object_name(object_name: &str) -> bool {
if object_name.is_empty() || object_name.starts_with('.') || object_name.ends_with('.') {
return false;
}
OBJECT_NAME_RE.is_match(object_name)
}
fn encode_object_name(object_name: &str) -> String {
base64::encode_config(object_name, URL_SAFE)
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Config {
pub bucket: String,
pub description: Option<String>,
pub max_age: Duration,
pub storage: StorageType,
pub num_replicas: usize,
}
impl JetStream {
pub fn create_object_store(&self, config: &Config) -> io::Result<ObjectStore> {
if !self.connection.is_server_compatible_version(2, 6, 2) {
return Err(io::Error::new(
io::ErrorKind::Other,
"object-store requires at least server version 2.6.2",
));
}
if !is_valid_bucket_name(&config.bucket) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid bucket name",
));
}
let bucket_name = config.bucket.clone();
let stream_name = format!("OBJ_{bucket_name}");
let chunk_subject = format!("$O.{bucket_name}.C.>");
let meta_subject = format!("$O.{bucket_name}.M.>");
self.add_stream(&StreamConfig {
name: stream_name,
description: config.description.clone(),
subjects: vec![chunk_subject, meta_subject],
max_age: config.max_age,
storage: config.storage,
num_replicas: config.num_replicas,
discard: DiscardPolicy::New,
allow_rollup: true,
..Default::default()
})?;
Ok(ObjectStore::new(bucket_name, self.clone()))
}
pub fn object_store(&self, bucket_name: &str) -> io::Result<ObjectStore> {
if !self.connection.is_server_compatible_version(2, 6, 2) {
return Err(io::Error::new(
io::ErrorKind::Other,
"object-store requires at least server version 2.6.2",
));
}
if !is_valid_bucket_name(bucket_name) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid bucket name",
));
}
let stream_name = format!("OBJ_{bucket_name}");
self.stream_info(stream_name)?;
Ok(ObjectStore::new(bucket_name.to_string(), self.clone()))
}
pub fn delete_object_store(&self, bucket_name: &str) -> io::Result<()> {
let stream_name = format!("OBJ_{bucket_name}");
self.delete_stream(stream_name)?;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectInfo {
pub name: String,
pub description: Option<String>,
pub link: Option<ObjectLink>,
pub bucket: String,
pub nuid: String,
pub size: usize,
pub chunks: usize,
#[serde(with = "rfc3339", rename = "mtime")]
pub modified: DateTime,
pub digest: String,
#[serde(default)]
pub deleted: bool,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectMeta {
pub name: String,
pub description: Option<String>,
pub link: Option<ObjectLink>,
}
impl From<&str> for ObjectMeta {
fn from(s: &str) -> ObjectMeta {
ObjectMeta {
name: s.to_string(),
..Default::default()
}
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectLink {
pub name: String,
pub bucket: Option<String>,
}
pub struct ObjectStore {
name: String,
context: JetStream,
}
pub struct Object {
pub info: ObjectInfo,
subscription: PushSubscription,
remaining_bytes: Vec<u8>,
has_pending_messages: bool,
digest: Option<ring::digest::Context>,
}
impl Object {
pub(crate) fn new(subscription: PushSubscription, info: ObjectInfo) -> Self {
Object {
subscription,
info,
remaining_bytes: Vec::new(),
has_pending_messages: true,
digest: Some(ring::digest::Context::new(&SHA256)),
}
}
pub fn info(&self) -> &ObjectInfo {
&self.info
}
}
impl io::Read for Object {
fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
if !self.remaining_bytes.is_empty() {
let len = cmp::min(buffer.len(), self.remaining_bytes.len());
buffer[..len].copy_from_slice(&self.remaining_bytes[..len]);
self.remaining_bytes = self.remaining_bytes[len..].to_vec();
return Ok(len);
}
if self.has_pending_messages {
let maybe_message = self.subscription.next();
if let Some(message) = maybe_message {
let len = cmp::min(buffer.len(), message.data.len());
buffer[..len].copy_from_slice(&message.data[..len]);
if let Some(context) = &mut self.digest {
context.update(&message.data);
}
self.remaining_bytes.extend_from_slice(&message.data[len..]);
if let Some(message_info) = message.jetstream_message_info() {
if message_info.pending == 0 {
let digest = self.digest.take().map(|context| context.finish());
if let Some(digest) = digest {
if format!("SHA-256={}", base64::encode_config(digest, URL_SAFE))
!= self.info.digest
{
return Err(io::Error::new(ErrorKind::InvalidData, "wrong digest"));
}
} else {
return Err(io::Error::new(
ErrorKind::InvalidData,
"digest should be Some",
));
}
self.has_pending_messages = false;
}
}
return Ok(len);
}
}
Ok(0)
}
}
impl ObjectStore {
pub(crate) fn new(name: String, context: JetStream) -> Self {
ObjectStore { name, context }
}
pub fn info(&self, object_name: &str) -> io::Result<ObjectInfo> {
let object_name = encode_object_name(object_name);
if !is_valid_object_name(&object_name) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid object name",
));
}
let stream_name = format!("OBJ_{}", &self.name);
let subject = format!("$O.{}.M.{}", &self.name, &object_name);
let message = self.context.get_last_message(stream_name, &subject)?;
let object_info = serde_json::from_slice::<ObjectInfo>(&message.data)?;
Ok(object_info)
}
pub fn seal(&self) -> io::Result<()> {
let stream_name = format!("OBJ_{}", self.name);
let stream_info = self.context.stream_info(stream_name)?;
let mut stream_config = stream_info.config;
stream_config.sealed = true;
self.context.update_stream(&stream_config)?;
Ok(())
}
pub fn put<T>(&self, meta: T, data: &mut impl io::Read) -> io::Result<ObjectInfo>
where
ObjectMeta: From<T>,
{
let object_meta: ObjectMeta = meta.into();
let object_name = encode_object_name(&object_meta.name);
if !is_valid_object_name(&object_name) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid object name",
));
}
let maybe_existing_object_info = match self.info(&object_name) {
Ok(object_info) => Some(object_info),
Err(_) => None,
};
let object_nuid = nuid::next();
let chunk_subject = format!("$O.{}.C.{}", &self.name, &object_nuid);
let mut object_chunks = 0;
let mut object_size = 0;
let mut context = ring::digest::Context::new(&SHA256);
let mut buffer = [0; DEFAULT_CHUNK_SIZE];
loop {
let n = data.read(&mut buffer)?;
if n == 0 {
break;
}
context.update(&buffer[..n]);
object_size += n;
object_chunks += 1;
self.context.publish(&chunk_subject, &buffer[..n])?;
}
let digest = context.finish();
let subject = format!("$O.{}.M.{}", &self.name, &object_name);
let object_info = ObjectInfo {
name: object_meta.name,
description: object_meta.description,
link: object_meta.link,
bucket: self.name.clone(),
nuid: object_nuid.to_string(),
chunks: object_chunks,
size: object_size,
digest: format!("SHA-256={}", base64::encode_config(digest, URL_SAFE)),
modified: OffsetDateTime::now_utc(),
deleted: false,
};
let data = serde_json::to_vec(&object_info)?;
let mut headers = HeaderMap::default();
headers.insert(NATS_ROLLUP, ROLLUP_SUBJECT.to_string());
let message = Message::new(&subject, None, data, Some(headers));
self.context.publish_message(&message)?;
if let Some(existing_object_info) = maybe_existing_object_info {
let stream_name = format!("OBJ_{}", self.name);
let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid);
self.context
.purge_stream_subject(stream_name, &chunk_subject)?;
}
Ok(object_info)
}
pub fn get(&self, object_name: &str) -> io::Result<Object> {
let object_info = self.info(object_name)?;
if let Some(link) = object_info.link {
return self.get(&link.name);
}
let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
let subscription = self
.context
.subscribe_with_options(&chunk_subject, &SubscribeOptions::ordered())?;
Ok(Object::new(subscription, object_info))
}
pub fn delete(&self, object_name: &str) -> io::Result<()> {
let mut object_info = self.info(object_name)?;
object_info.chunks = 0;
object_info.size = 0;
object_info.deleted = true;
let data = serde_json::to_vec(&object_info)?;
let mut headers = HeaderMap::default();
headers.insert(NATS_ROLLUP, ROLLUP_SUBJECT.to_string());
let subject = format!("$O.{}.M.{}", &self.name, &encode_object_name(object_name));
let message = Message::new(&subject, None, data, Some(headers));
self.context.publish_message(&message)?;
let stream_name = format!("OBJ_{}", self.name);
let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid);
self.context
.purge_stream_subject(stream_name, &chunk_subject)?;
Ok(())
}
pub fn watch(&self) -> io::Result<Watch> {
let subject = format!("$O.{}.M.>", &self.name);
let subscription = self.context.subscribe_with_options(
&subject,
&SubscribeOptions::ordered().deliver_last_per_subject(),
)?;
Ok(Watch { subscription })
}
}
pub struct Watch {
subscription: PushSubscription,
}
impl Iterator for Watch {
type Item = ObjectInfo;
fn next(&mut self) -> Option<Self::Item> {
match self.subscription.next() {
Some(message) => Some(serde_json::from_slice(&message.data).unwrap()),
None => None,
}
}
}