use std::io;
use std::time::Duration;
use crate::header::{self, HeaderMap};
use crate::jetstream::{
DateTime, DiscardPolicy, Error, ErrorCode, JetStream, PushSubscription, StorageType,
StreamConfig, StreamInfo, StreamMessage, SubscribeOptions,
};
use crate::message::Message;
use lazy_static::lazy_static;
use regex::Regex;
#[derive(Debug, Default)]
pub struct Config {
pub bucket: String,
pub description: String,
pub max_value_size: i32,
pub history: i64,
pub max_age: Duration,
pub max_bytes: i64,
pub storage: StorageType,
pub num_replicas: usize,
}
const MAX_HISTORY: i64 = 64;
const ALL_KEYS: &str = ">";
const KV_OPERATION: &str = "KV-Operation";
const KV_OPERATION_DELETE: &str = "DEL";
const KV_OPERATION_PURGE: &str = "PURGE";
const NATS_ROLLUP: &str = "Nats-Rollup";
const ROLLUP_SUBJECT: &str = "sub";
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum Operation {
Put,
Delete,
Purge,
}
fn kv_operation_from_maybe_headers(maybe_headers: Option<&HeaderMap>) -> Operation {
if let Some(headers) = maybe_headers {
if let Some(op) = headers.get(KV_OPERATION) {
return match op.as_str() {
KV_OPERATION_DELETE => Operation::Delete,
KV_OPERATION_PURGE => Operation::Purge,
_ => Operation::Put,
};
}
}
Operation::Put
}
fn kv_operation_from_stream_message(message: &StreamMessage) -> Operation {
kv_operation_from_maybe_headers(message.headers.as_ref())
}
lazy_static! {
static ref VALID_BUCKET_RE: Regex = Regex::new(r#"\A[a-zA-Z0-9_-]+\z"#).unwrap();
static ref VALID_KEY_RE: Regex = Regex::new(r#"\A[-/_=\.a-zA-Z0-9]+\z"#).unwrap();
}
fn is_valid_bucket_name(bucket_name: &str) -> bool {
VALID_BUCKET_RE.is_match(bucket_name)
}
fn is_valid_key(key: &str) -> bool {
if key.is_empty() || key.starts_with('.') || key.ends_with('.') {
return false;
}
VALID_KEY_RE.is_match(key)
}
impl JetStream {
pub fn key_value(&self, bucket: &str) -> io::Result<Store> {
if !self.connection.is_server_compatible_version(2, 6, 2) {
return Err(io::Error::new(
io::ErrorKind::Other,
"key-value requires at least server version 2.6.2",
));
}
if !is_valid_bucket_name(bucket) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid bucket name",
));
}
let stream_name = format!("KV_{}", bucket);
let stream_info = self.stream_info(&stream_name)?;
if stream_info.config.max_msgs_per_subject < 1 {
return Err(io::Error::new(
io::ErrorKind::Other,
"bucket not valid key-value store",
));
}
Ok(Store {
name: bucket.to_string(),
stream_name,
prefix: format!("$KV.{}.", bucket),
context: self.clone(),
domain_prefix: self
.options
.has_domain
.then(|| self.options.api_prefix.clone()),
})
}
pub fn create_key_value(&self, config: &Config) -> io::Result<Store> {
if !self.connection.is_server_compatible_version(2, 6, 2) {
return Err(io::Error::new(
io::ErrorKind::Other,
"key-value requires at least server version 2.6.2",
));
}
let discard_policy = {
if self.connection.is_server_compatible_version(2, 7, 2) {
DiscardPolicy::New
} else {
DiscardPolicy::Old
}
};
if !is_valid_bucket_name(&config.bucket) {
return Err(io::Error::new(io::ErrorKind::Other, "invalid bucket name"));
}
self.account_info()?;
let history = if config.history > 0 {
if config.history > MAX_HISTORY {
return Err(io::Error::new(
io::ErrorKind::Other,
"history limited to a max of 64",
));
}
config.history
} else {
1
};
let num_replicas = if config.num_replicas == 0 {
1
} else {
config.num_replicas
};
let stream_info = self.add_stream(&StreamConfig {
name: format!("KV_{}", config.bucket),
description: Some(config.description.to_string()),
subjects: vec![format!("$KV.{}.>", config.bucket)],
max_msgs_per_subject: history,
max_bytes: config.max_bytes,
max_age: config.max_age,
max_msg_size: config.max_value_size,
storage: config.storage,
allow_rollup: true,
deny_delete: true,
num_replicas,
discard: discard_policy,
..Default::default()
})?;
Ok(Store {
name: config.bucket.to_string(),
stream_name: stream_info.config.name,
prefix: format!("$KV.{}.", config.bucket),
context: self.clone(),
domain_prefix: self
.options
.has_domain
.then(|| self.options.api_prefix.clone()),
})
}
pub fn delete_key_value(&self, bucket: &str) -> io::Result<()> {
if !self.connection.is_server_compatible_version(2, 6, 2) {
return Err(io::Error::new(
io::ErrorKind::Other,
"key-value requires at least server version 2.6.2",
));
}
if !is_valid_bucket_name(bucket) {
return Err(io::Error::new(io::ErrorKind::Other, "invalid bucket name"));
}
let stream_name = format!("KV_{}", bucket);
self.delete_stream(&stream_name)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct Entry {
pub bucket: String,
pub key: String,
pub value: Vec<u8>,
pub revision: u64,
pub delta: u64,
pub created: DateTime,
pub operation: Operation,
}
#[derive(Debug, Clone)]
pub struct Store {
name: String,
stream_name: String,
prefix: String,
context: JetStream,
domain_prefix: Option<String>,
}
impl Store {
pub fn status(&self) -> io::Result<BucketStatus> {
let info = self.context.stream_info(&self.stream_name)?;
Ok(BucketStatus {
bucket: self.name.to_string(),
info,
})
}
pub fn entry(&self, key: &str) -> io::Result<Option<Entry>> {
if !is_valid_key(key) {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
}
let mut subject = String::new();
subject.push_str(&self.prefix);
subject.push_str(key);
match self.context.get_last_message(&self.stream_name, &subject) {
Ok(message) => {
let operation = kv_operation_from_stream_message(&message);
let entry = Entry {
bucket: self.name.clone(),
key: key.to_string(),
value: message.data,
revision: message.sequence,
created: message.time,
operation,
delta: 0,
};
Ok(Some(entry))
}
Err(err) => {
if let Some(inner_err) = err.get_ref() {
if let Some(error) = inner_err.downcast_ref::<Error>() {
if error.error_code() == ErrorCode::NoMessageFound {
return Ok(None);
}
}
}
Err(err)
}
}
}
pub fn get(&self, key: &str) -> io::Result<Option<Vec<u8>>> {
match self.entry(key) {
Ok(Some(entry)) => match entry.operation {
Operation::Put => Ok(Some(entry.value)),
_ => Ok(None),
},
Ok(None) => Ok(None),
Err(err) => Err(err),
}
}
pub fn put(&self, key: &str, value: impl AsRef<[u8]>) -> io::Result<u64> {
if !is_valid_key(key) {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
}
let mut subject = String::new();
if let Some(api_prefix) = self.domain_prefix.as_ref() {
subject.push_str(api_prefix);
}
subject.push_str(&self.prefix);
subject.push_str(key);
let publish_ack = self.context.publish(&subject, value)?;
Ok(publish_ack.sequence)
}
pub fn create(&self, key: &str, value: impl AsRef<[u8]>) -> io::Result<u64> {
let result = self.update(key, &value, 0);
if result.is_ok() {
return result;
}
if let Ok(Some(entry)) = self.entry(key) {
if entry.operation != Operation::Put {
return self.update(key, &value, entry.revision);
}
}
result
}
pub fn update(&self, key: &str, value: impl AsRef<[u8]>, revision: u64) -> io::Result<u64> {
if !is_valid_key(key) {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
}
let mut subject = String::new();
if let Some(api_prefix) = self.domain_prefix.as_ref() {
subject.push_str(api_prefix);
}
subject.push_str(&self.prefix);
subject.push_str(key);
let mut headers = HeaderMap::default();
headers.insert(
header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
revision.to_string(),
);
let message = Message::new(&subject, None, value, Some(headers));
let publish_ack = self.context.publish_message(&message)?;
Ok(publish_ack.sequence)
}
pub fn delete(&self, key: &str) -> io::Result<()> {
if !is_valid_key(key) {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
}
let mut subject = String::new();
if let Some(api_prefix) = self.domain_prefix.as_ref() {
subject.push_str(api_prefix);
}
subject.push_str(&self.prefix);
subject.push_str(key);
let mut headers = HeaderMap::default();
headers.insert(KV_OPERATION, KV_OPERATION_DELETE.to_string());
let message = Message::new(&subject, None, b"", Some(headers));
self.context.publish_message(&message)?;
Ok(())
}
pub fn purge(&self, key: &str) -> io::Result<()> {
if !is_valid_key(key) {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
}
let mut subject = String::new();
subject.push_str(&self.prefix);
subject.push_str(key);
let mut headers = HeaderMap::default();
headers.insert(KV_OPERATION, KV_OPERATION_PURGE.to_string());
headers.insert(NATS_ROLLUP, ROLLUP_SUBJECT.to_string());
let message = Message::new(&subject, None, b"", Some(headers));
self.context.publish_message(&message)?;
Ok(())
}
pub fn keys(&self) -> io::Result<Keys> {
let mut subject = String::new();
subject.push_str(&self.prefix);
subject.push_str(ALL_KEYS);
let subscription = self.context.subscribe_with_options(
&subject,
&SubscribeOptions::ordered()
.headers_only()
.deliver_last_per_subject(),
)?;
Ok(Keys {
prefix: self.prefix.clone(),
subscription,
done: false,
})
}
pub fn history(&self, key: &str) -> io::Result<History> {
let mut subject = String::new();
subject.push_str(&self.prefix);
subject.push_str(key);
let subscription = self.context.subscribe_with_options(
&subject,
&SubscribeOptions::ordered()
.deliver_all()
.enable_flow_control()
.idle_heartbeat(Duration::from_millis(5000)),
)?;
Ok(History {
bucket: self.name.clone(),
prefix: self.prefix.clone(),
subscription,
done: false,
})
}
pub fn watch_all(&self) -> io::Result<Watch> {
self.watch(">")
}
pub fn watch<T: AsRef<str>>(&self, key: T) -> io::Result<Watch> {
let subject = format!("{}{}", self.prefix, key.as_ref());
let subscription = self.context.subscribe_with_options(
subject.as_str(),
&SubscribeOptions::ordered()
.deliver_last_per_subject()
.enable_flow_control()
.idle_heartbeat(Duration::from_millis(5000)),
)?;
Ok(Watch {
bucket: self.name.clone(),
prefix: self.prefix.clone(),
subscription,
})
}
pub fn bucket(&self) -> &String {
&self.name
}
}
pub struct Keys {
prefix: String,
subscription: PushSubscription,
done: bool,
}
impl Iterator for Keys {
type Item = String;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.done {
return None;
}
return match self.subscription.next() {
Some(message) => {
if let Some(info) = message.jetstream_message_info() {
if info.pending == 0 {
self.done = true;
}
}
let operation = kv_operation_from_maybe_headers(message.headers.as_ref());
if operation != Operation::Put {
continue;
}
message
.subject
.strip_prefix(&self.prefix)
.map(|s| s.to_string())
}
None => None,
};
}
}
}
pub struct History {
bucket: String,
prefix: String,
subscription: PushSubscription,
done: bool,
}
impl Iterator for History {
type Item = Entry;
fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
match self.subscription.next() {
Some(message) => {
if let Some(info) = message.jetstream_message_info() {
if info.pending == 0 {
self.done = true;
}
let operation = kv_operation_from_maybe_headers(message.headers.as_ref());
let key = message
.subject
.strip_prefix(&self.prefix)
.map(|s| s.to_string())
.unwrap();
Some(Entry {
bucket: self.bucket.clone(),
key,
value: message.data.clone(),
revision: info.stream_seq,
created: info.published,
delta: info.pending,
operation,
})
} else {
None
}
}
None => None,
}
}
}
pub struct Watch {
bucket: String,
prefix: String,
subscription: PushSubscription,
}
impl Iterator for Watch {
type Item = Entry;
fn next(&mut self) -> Option<Self::Item> {
match self.subscription.next() {
Some(message) => {
if let Some(info) = message.jetstream_message_info() {
let operation = kv_operation_from_maybe_headers(message.headers.as_ref());
let key = message
.subject
.strip_prefix(&self.prefix)
.map(|s| s.to_string())
.unwrap();
Some(Entry {
bucket: self.bucket.clone(),
key,
value: message.data.clone(),
revision: info.stream_seq,
created: info.published,
delta: info.pending,
operation,
})
} else {
None
}
}
None => None,
}
}
}
pub struct BucketStatus {
info: StreamInfo,
bucket: String,
}
impl BucketStatus {
pub fn bucket(&self) -> &String {
&self.bucket
}
pub fn values(&self) -> u64 {
self.info.state.messages
}
pub fn history(&self) -> i64 {
self.info.config.max_msgs_per_subject
}
pub fn max_age(&self) -> Duration {
self.info.config.max_age
}
}