use std::fmt;
use std::pin::Pin;
use std::time::Duration;
use futures::Stream;
use serde::{Deserialize, Serialize};
use crate::client::Client;
use crate::vault::VaultClient;
use crate::Error;
#[cfg(not(feature = "rest"))]
use crate::ErrorKind;
use crate::Relationship;
type InnerWatchStream = Pin<Box<dyn Stream<Item = Result<WatchEvent, Error>> + Send>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Operation {
Create,
Delete,
}
impl Operation {
pub fn is_create(&self) -> bool {
matches!(self, Operation::Create)
}
pub fn is_delete(&self) -> bool {
matches!(self, Operation::Delete)
}
}
impl fmt::Display for Operation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Operation::Create => write!(f, "create"),
Operation::Delete => write!(f, "delete"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum WatchFilter {
ResourceType(String),
SubjectType(String),
Resource(String),
Subject(String),
Relation(String),
Operations(Vec<Operation>),
Custom(String),
}
impl WatchFilter {
pub fn resource_type(type_name: impl Into<String>) -> Self {
WatchFilter::ResourceType(type_name.into())
}
pub fn subject_type(type_name: impl Into<String>) -> Self {
WatchFilter::SubjectType(type_name.into())
}
pub fn resource(resource_id: impl Into<String>) -> Self {
WatchFilter::Resource(resource_id.into())
}
pub fn subject(subject_id: impl Into<String>) -> Self {
WatchFilter::Subject(subject_id.into())
}
pub fn relation(relation_name: impl Into<String>) -> Self {
WatchFilter::Relation(relation_name.into())
}
pub fn operations(ops: impl IntoIterator<Item = Operation>) -> Self {
WatchFilter::Operations(ops.into_iter().collect())
}
pub fn custom(expression: impl Into<String>) -> Self {
WatchFilter::Custom(expression.into())
}
pub fn matches(&self, event: &WatchEvent) -> bool {
match self {
WatchFilter::ResourceType(t) => {
event.relationship.resource_type().is_some_and(|rt| rt == t)
}
WatchFilter::SubjectType(t) => {
event.relationship.subject_type().is_some_and(|st| st == t)
}
WatchFilter::Resource(r) => event.relationship.resource() == r,
WatchFilter::Subject(s) => event.relationship.subject() == s,
WatchFilter::Relation(r) => event.relationship.relation() == r,
WatchFilter::Operations(ops) => ops.contains(&event.operation),
WatchFilter::Custom(_) => true, }
}
}
impl fmt::Display for WatchFilter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WatchFilter::ResourceType(t) => write!(f, "resource_type={}", t),
WatchFilter::SubjectType(t) => write!(f, "subject_type={}", t),
WatchFilter::Resource(r) => write!(f, "resource={}", r),
WatchFilter::Subject(s) => write!(f, "subject={}", s),
WatchFilter::Relation(r) => write!(f, "relation={}", r),
WatchFilter::Operations(ops) => {
let op_strs: Vec<_> = ops.iter().map(|o| o.to_string()).collect();
write!(f, "operations=[{}]", op_strs.join(","))
}
WatchFilter::Custom(expr) => write!(f, "custom={}", expr),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatchEvent {
pub operation: Operation,
#[serde(with = "relationship_serde")]
pub relationship: Relationship<'static>,
pub revision: u64,
pub timestamp: chrono::DateTime<chrono::Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub actor: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
}
mod relationship_serde {
use super::*;
use serde::{Deserializer, Serializer};
#[derive(Deserialize)]
struct RelationshipDto {
resource: String,
relation: String,
subject: String,
}
#[derive(Serialize)]
struct RelationshipDtoRef<'a> {
resource: &'a str,
relation: &'a str,
subject: &'a str,
}
pub fn serialize<S>(rel: &Relationship<'static>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let dto = RelationshipDtoRef {
resource: rel.resource(),
relation: rel.relation(),
subject: rel.subject(),
};
dto.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Relationship<'static>, D::Error>
where
D: Deserializer<'de>,
{
let dto = RelationshipDto::deserialize(deserializer)?;
Ok(Relationship::new(dto.resource, dto.relation, dto.subject))
}
}
impl WatchEvent {
pub fn new(
operation: Operation,
relationship: Relationship<'static>,
revision: u64,
timestamp: chrono::DateTime<chrono::Utc>,
) -> Self {
Self {
operation,
relationship,
revision,
timestamp,
actor: None,
request_id: None,
}
}
#[must_use]
pub fn with_actor(mut self, actor: impl Into<String>) -> Self {
self.actor = Some(actor.into());
self
}
#[must_use]
pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
self.request_id = Some(request_id.into());
self
}
pub fn is_create(&self) -> bool {
self.operation.is_create()
}
pub fn is_delete(&self) -> bool {
self.operation.is_delete()
}
pub fn resource(&self) -> &str {
self.relationship.resource()
}
pub fn relation(&self) -> &str {
self.relationship.relation()
}
pub fn subject(&self) -> &str {
self.relationship.subject()
}
}
impl fmt::Display for WatchEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"[{}] {} {} -[{}]-> {}",
self.revision,
self.operation,
self.relationship.subject(),
self.relationship.relation(),
self.relationship.resource()
)
}
}
#[derive(Debug, Clone)]
pub struct ReconnectConfig {
pub max_retries: Option<u32>,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub backoff_multiplier: f64,
pub jitter: f64,
}
impl Default for ReconnectConfig {
fn default() -> Self {
Self {
max_retries: None, initial_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(30),
backoff_multiplier: 2.0,
jitter: 0.1,
}
}
}
impl ReconnectConfig {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn max_retries(mut self, retries: u32) -> Self {
self.max_retries = Some(retries);
self
}
#[must_use]
pub fn infinite_retries(mut self) -> Self {
self.max_retries = None;
self
}
#[must_use]
pub fn initial_backoff(mut self, duration: Duration) -> Self {
self.initial_backoff = duration;
self
}
#[must_use]
pub fn max_backoff(mut self, duration: Duration) -> Self {
self.max_backoff = duration;
self
}
#[must_use]
pub fn backoff_multiplier(mut self, multiplier: f64) -> Self {
self.backoff_multiplier = multiplier;
self
}
#[must_use]
pub fn jitter(mut self, jitter: f64) -> Self {
self.jitter = jitter.clamp(0.0, 1.0);
self
}
pub fn backoff_for_attempt(&self, attempt: u32) -> Duration {
let base_backoff =
self.initial_backoff.as_secs_f64() * self.backoff_multiplier.powi(attempt as i32);
let capped = base_backoff.min(self.max_backoff.as_secs_f64());
let jitter_range = capped * self.jitter;
let jittered = capped - jitter_range / 2.0 + rand::random::<f64>() * jitter_range;
Duration::from_secs_f64(jittered.max(0.0))
}
}
pub struct WatchBuilder {
#[cfg_attr(not(feature = "rest"), allow(dead_code))]
client: Client,
organization_id: String,
vault_id: String,
filters: Vec<WatchFilter>,
from_revision: Option<u64>,
resumable: bool,
reconnect_config: Option<ReconnectConfig>,
}
impl WatchBuilder {
pub(crate) fn new(vault: &VaultClient) -> Self {
Self {
client: vault.client().clone(),
organization_id: vault.organization_id().to_string(),
vault_id: vault.vault_id().to_string(),
filters: Vec::new(),
from_revision: None,
resumable: false,
reconnect_config: None,
}
}
#[must_use]
pub fn filter(mut self, filter: WatchFilter) -> Self {
self.filters.push(filter);
self
}
#[must_use]
pub fn from_revision(mut self, revision: u64) -> Self {
self.from_revision = Some(revision);
self
}
#[must_use]
pub fn resumable(mut self) -> Self {
self.resumable = true;
self
}
#[must_use]
pub fn no_reconnect(mut self) -> Self {
self.resumable = false;
self
}
#[must_use]
pub fn reconnect(mut self, config: ReconnectConfig) -> Self {
self.reconnect_config = Some(config);
self.resumable = true;
self
}
pub fn filters(&self) -> &[WatchFilter] {
&self.filters
}
pub fn starting_revision(&self) -> Option<u64> {
self.from_revision
}
pub fn is_resumable(&self) -> bool {
self.resumable
}
#[cfg(feature = "rest")]
pub async fn run(self) -> Result<WatchStream, Error> {
let mut query_params = Vec::new();
if let Some(rev) = self.from_revision {
query_params.push(format!("from_revision={}", rev));
}
for filter in &self.filters {
match filter {
WatchFilter::ResourceType(t) => query_params.push(format!("resource_type={}", t)),
WatchFilter::SubjectType(t) => query_params.push(format!("subject_type={}", t)),
WatchFilter::Resource(r) => query_params.push(format!("resource={}", r)),
WatchFilter::Subject(s) => query_params.push(format!("subject={}", s)),
WatchFilter::Relation(r) => query_params.push(format!("relation={}", r)),
WatchFilter::Operations(ops) => {
for op in ops {
query_params.push(format!("operation={}", op));
}
}
WatchFilter::Custom(expr) => query_params.push(format!("filter={}", expr)),
}
}
let _path = format!(
"/v1/organizations/{}/vaults/{}/watch{}",
self.organization_id,
self.vault_id,
if query_params.is_empty() {
String::new()
} else {
format!("?{}", query_params.join("&"))
}
);
Ok(WatchStream::new(
self.client,
self.organization_id,
self.vault_id,
self.filters,
self.from_revision,
self.resumable,
self.reconnect_config,
))
}
#[cfg(not(feature = "rest"))]
pub async fn run(self) -> Result<WatchStream, Error> {
Err(Error::new(
ErrorKind::Configuration,
"REST feature is required for watch streams",
))
}
}
impl fmt::Debug for WatchBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WatchBuilder")
.field("organization_id", &self.organization_id)
.field("vault_id", &self.vault_id)
.field("filters", &self.filters)
.field("from_revision", &self.from_revision)
.field("resumable", &self.resumable)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone)]
pub struct WatchShutdownHandle {
sender: tokio::sync::watch::Sender<bool>,
}
impl WatchShutdownHandle {
fn new() -> (Self, tokio::sync::watch::Receiver<bool>) {
let (sender, receiver) = tokio::sync::watch::channel(false);
(Self { sender }, receiver)
}
pub fn shutdown(&self) {
let _ = self.sender.send(true);
}
pub fn is_shutdown(&self) -> bool {
*self.sender.borrow()
}
}
pub struct WatchStream {
#[allow(dead_code)]
client: Client,
#[allow(dead_code)]
organization_id: String,
#[allow(dead_code)]
vault_id: String,
filters: Vec<WatchFilter>,
last_revision: Option<u64>,
#[allow(dead_code)]
resumable: bool,
#[allow(dead_code)]
reconnect_config: Option<ReconnectConfig>,
shutdown_receiver: tokio::sync::watch::Receiver<bool>,
shutdown_handle: WatchShutdownHandle,
inner: Option<InnerWatchStream>,
}
impl WatchStream {
fn new(
client: Client,
organization_id: String,
vault_id: String,
filters: Vec<WatchFilter>,
from_revision: Option<u64>,
resumable: bool,
reconnect_config: Option<ReconnectConfig>,
) -> Self {
let (shutdown_handle, shutdown_receiver) = WatchShutdownHandle::new();
Self {
client,
organization_id,
vault_id,
filters,
last_revision: from_revision,
resumable,
reconnect_config,
shutdown_receiver,
shutdown_handle,
inner: None,
}
}
pub fn shutdown_handle(&self) -> WatchShutdownHandle {
self.shutdown_handle.clone()
}
pub fn last_revision(&self) -> Option<u64> {
self.last_revision
}
fn matches_filters(&self, event: &WatchEvent) -> bool {
self.filters.iter().all(|f| f.matches(event))
}
}
impl Stream for WatchStream {
type Item = Result<WatchEvent, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if *self.shutdown_receiver.borrow() {
return std::task::Poll::Ready(None);
}
if self.inner.is_none() {
let stream: Pin<Box<dyn Stream<Item = Result<WatchEvent, Error>> + Send>> =
Box::pin(futures::stream::empty());
self.inner = Some(stream);
}
if let Some(ref mut inner) = self.inner {
match inner.as_mut().poll_next(cx) {
std::task::Poll::Ready(Some(Ok(event))) => {
self.last_revision = Some(event.revision);
if self.matches_filters(&event) {
std::task::Poll::Ready(Some(Ok(event)))
} else {
cx.waker().wake_by_ref();
std::task::Poll::Pending
}
}
other => other,
}
} else {
std::task::Poll::Ready(None)
}
}
}
impl fmt::Debug for WatchStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WatchStream")
.field("organization_id", &self.organization_id)
.field("vault_id", &self.vault_id)
.field("filters", &self.filters)
.field("last_revision", &self.last_revision)
.field("resumable", &self.resumable)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_operation_is_create() {
assert!(Operation::Create.is_create());
assert!(!Operation::Create.is_delete());
assert!(!Operation::Delete.is_create());
assert!(Operation::Delete.is_delete());
}
#[test]
fn test_operation_display() {
assert_eq!(Operation::Create.to_string(), "create");
assert_eq!(Operation::Delete.to_string(), "delete");
}
#[test]
fn test_watch_filter_constructors() {
assert_eq!(
WatchFilter::resource_type("document"),
WatchFilter::ResourceType("document".to_string())
);
assert_eq!(
WatchFilter::subject_type("user"),
WatchFilter::SubjectType("user".to_string())
);
assert_eq!(
WatchFilter::resource("document:readme"),
WatchFilter::Resource("document:readme".to_string())
);
assert_eq!(
WatchFilter::subject("user:alice"),
WatchFilter::Subject("user:alice".to_string())
);
assert_eq!(
WatchFilter::relation("viewer"),
WatchFilter::Relation("viewer".to_string())
);
assert_eq!(
WatchFilter::operations([Operation::Create]),
WatchFilter::Operations(vec![Operation::Create])
);
assert_eq!(
WatchFilter::custom("custom_expr"),
WatchFilter::Custom("custom_expr".to_string())
);
}
#[test]
fn test_watch_filter_display() {
assert_eq!(
WatchFilter::resource_type("document").to_string(),
"resource_type=document"
);
assert_eq!(
WatchFilter::subject_type("user").to_string(),
"subject_type=user"
);
assert_eq!(
WatchFilter::resource("document:readme").to_string(),
"resource=document:readme"
);
assert_eq!(
WatchFilter::subject("user:alice").to_string(),
"subject=user:alice"
);
assert_eq!(
WatchFilter::relation("viewer").to_string(),
"relation=viewer"
);
assert_eq!(
WatchFilter::operations([Operation::Create, Operation::Delete]).to_string(),
"operations=[create,delete]"
);
assert_eq!(WatchFilter::custom("expr").to_string(), "custom=expr");
}
#[test]
fn test_watch_filter_matches() {
let rel = Relationship::new("document:readme", "viewer", "user:alice");
let event = WatchEvent::new(Operation::Create, rel, 1, chrono::Utc::now());
assert!(WatchFilter::resource_type("document").matches(&event));
assert!(!WatchFilter::resource_type("folder").matches(&event));
assert!(WatchFilter::subject_type("user").matches(&event));
assert!(!WatchFilter::subject_type("group").matches(&event));
assert!(WatchFilter::resource("document:readme").matches(&event));
assert!(!WatchFilter::resource("document:other").matches(&event));
assert!(WatchFilter::subject("user:alice").matches(&event));
assert!(!WatchFilter::subject("user:bob").matches(&event));
assert!(WatchFilter::relation("viewer").matches(&event));
assert!(!WatchFilter::relation("editor").matches(&event));
assert!(WatchFilter::operations([Operation::Create]).matches(&event));
assert!(WatchFilter::operations([Operation::Create, Operation::Delete]).matches(&event));
assert!(!WatchFilter::operations([Operation::Delete]).matches(&event));
assert!(WatchFilter::custom("anything").matches(&event));
}
#[test]
fn test_watch_event_new() {
let rel = Relationship::new("document:readme", "viewer", "user:alice");
let now = chrono::Utc::now();
let event = WatchEvent::new(Operation::Create, rel.clone(), 42, now);
assert_eq!(event.operation, Operation::Create);
assert_eq!(event.revision, 42);
assert_eq!(event.timestamp, now);
assert_eq!(event.resource(), "document:readme");
assert_eq!(event.relation(), "viewer");
assert_eq!(event.subject(), "user:alice");
assert!(event.actor.is_none());
assert!(event.request_id.is_none());
}
#[test]
fn test_watch_event_with_metadata() {
let rel = Relationship::new("document:readme", "viewer", "user:alice");
let event = WatchEvent::new(Operation::Delete, rel, 100, chrono::Utc::now())
.with_actor("admin")
.with_request_id("req-123");
assert_eq!(event.actor, Some("admin".to_string()));
assert_eq!(event.request_id, Some("req-123".to_string()));
}
#[test]
fn test_watch_event_is_create_delete() {
let rel = Relationship::new("doc:1", "v", "u:1");
let create_event = WatchEvent::new(Operation::Create, rel.clone(), 1, chrono::Utc::now());
let delete_event = WatchEvent::new(Operation::Delete, rel, 2, chrono::Utc::now());
assert!(create_event.is_create());
assert!(!create_event.is_delete());
assert!(!delete_event.is_create());
assert!(delete_event.is_delete());
}
#[test]
fn test_watch_event_display() {
let rel = Relationship::new("document:readme", "viewer", "user:alice");
let event = WatchEvent::new(Operation::Create, rel, 42, chrono::Utc::now());
let display = event.to_string();
assert!(display.contains("[42]"));
assert!(display.contains("create"));
assert!(display.contains("user:alice"));
assert!(display.contains("viewer"));
assert!(display.contains("document:readme"));
}
#[test]
fn test_reconnect_config_default() {
let config = ReconnectConfig::default();
assert_eq!(config.max_retries, None);
assert_eq!(config.initial_backoff, Duration::from_millis(100));
assert_eq!(config.max_backoff, Duration::from_secs(30));
assert_eq!(config.backoff_multiplier, 2.0);
assert!((config.jitter - 0.1).abs() < f64::EPSILON);
}
#[test]
fn test_reconnect_config_builder() {
let config = ReconnectConfig::new()
.max_retries(5)
.initial_backoff(Duration::from_millis(200))
.max_backoff(Duration::from_secs(60))
.backoff_multiplier(1.5)
.jitter(0.2);
assert_eq!(config.max_retries, Some(5));
assert_eq!(config.initial_backoff, Duration::from_millis(200));
assert_eq!(config.max_backoff, Duration::from_secs(60));
assert_eq!(config.backoff_multiplier, 1.5);
assert!((config.jitter - 0.2).abs() < f64::EPSILON);
}
#[test]
fn test_reconnect_config_infinite_retries() {
let config = ReconnectConfig::new().max_retries(5).infinite_retries();
assert_eq!(config.max_retries, None);
}
#[test]
fn test_reconnect_config_jitter_clamped() {
let config = ReconnectConfig::new().jitter(2.0);
assert!((config.jitter - 1.0).abs() < f64::EPSILON);
let config = ReconnectConfig::new().jitter(-0.5);
assert!(config.jitter.abs() < f64::EPSILON);
}
#[test]
fn test_reconnect_config_backoff_calculation() {
let config = ReconnectConfig::new()
.initial_backoff(Duration::from_millis(100))
.max_backoff(Duration::from_secs(10))
.backoff_multiplier(2.0)
.jitter(0.0);
let b0 = config.backoff_for_attempt(0);
let b1 = config.backoff_for_attempt(1);
let b2 = config.backoff_for_attempt(2);
assert_eq!(b0, Duration::from_millis(100));
assert_eq!(b1, Duration::from_millis(200));
assert_eq!(b2, Duration::from_millis(400));
let b10 = config.backoff_for_attempt(10);
assert!(b10 <= Duration::from_secs(10));
}
#[test]
fn test_watch_shutdown_handle() {
let (handle, receiver) = WatchShutdownHandle::new();
assert!(!handle.is_shutdown());
assert!(!*receiver.borrow());
handle.shutdown();
assert!(handle.is_shutdown());
assert!(*receiver.borrow());
}
#[test]
fn test_watch_event_serialization() {
let rel = Relationship::new("document:readme", "viewer", "user:alice");
let event = WatchEvent::new(Operation::Create, rel, 42, chrono::Utc::now())
.with_actor("admin")
.with_request_id("req-123");
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("\"operation\":\"create\""));
assert!(json.contains("\"revision\":42"));
assert!(json.contains("\"actor\":\"admin\""));
assert!(json.contains("\"request_id\":\"req-123\""));
let parsed: WatchEvent = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.operation, Operation::Create);
assert_eq!(parsed.revision, 42);
assert_eq!(parsed.resource(), "document:readme");
assert_eq!(parsed.relation(), "viewer");
assert_eq!(parsed.subject(), "user:alice");
assert_eq!(parsed.actor, Some("admin".to_string()));
assert_eq!(parsed.request_id, Some("req-123".to_string()));
}
#[test]
fn test_operation_serialization() {
assert_eq!(
serde_json::to_string(&Operation::Create).unwrap(),
"\"create\""
);
assert_eq!(
serde_json::to_string(&Operation::Delete).unwrap(),
"\"delete\""
);
assert_eq!(
serde_json::from_str::<Operation>("\"create\"").unwrap(),
Operation::Create
);
assert_eq!(
serde_json::from_str::<Operation>("\"delete\"").unwrap(),
Operation::Delete
);
}
#[test]
fn test_watch_filter_matches_no_type_colon() {
let rel = Relationship::new("nocooldoc", "viewer", "justauser");
let event = WatchEvent::new(Operation::Create, rel, 1, chrono::Utc::now());
assert!(!WatchFilter::resource_type("document").matches(&event));
assert!(!WatchFilter::subject_type("user").matches(&event));
}
#[test]
fn test_watch_event_without_optional_fields() {
let rel = Relationship::new("doc:1", "viewer", "user:1");
let event = WatchEvent::new(Operation::Create, rel, 1, chrono::Utc::now());
let json = serde_json::to_string(&event).unwrap();
assert!(!json.contains("actor"));
assert!(!json.contains("request_id"));
}
#[test]
fn test_reconnect_config_backoff_with_jitter() {
let config = ReconnectConfig::new()
.initial_backoff(Duration::from_millis(100))
.max_backoff(Duration::from_secs(10))
.backoff_multiplier(2.0)
.jitter(0.5);
let backoff = config.backoff_for_attempt(0);
assert!(backoff >= Duration::from_millis(50) && backoff <= Duration::from_millis(150));
}
#[test]
fn test_watch_builder_debug() {
let filter = WatchFilter::resource_type("document");
let debug_str = format!("{:?}", filter);
assert!(debug_str.contains("ResourceType"));
}
#[test]
fn test_watch_filter_empty_operations() {
let rel = Relationship::new("document:readme", "viewer", "user:alice");
let event = WatchEvent::new(Operation::Create, rel, 1, chrono::Utc::now());
let filter = WatchFilter::operations([]);
assert!(!filter.matches(&event));
}
#[test]
fn test_reconnect_config_clone() {
let config = ReconnectConfig::new()
.max_retries(5)
.initial_backoff(Duration::from_millis(200));
let cloned = config.clone();
assert_eq!(cloned.max_retries, Some(5));
assert_eq!(cloned.initial_backoff, Duration::from_millis(200));
}
#[test]
fn test_operation_hash() {
use std::collections::HashSet;
let mut set = HashSet::new();
set.insert(Operation::Create);
set.insert(Operation::Delete);
set.insert(Operation::Create);
assert_eq!(set.len(), 2);
assert!(set.contains(&Operation::Create));
assert!(set.contains(&Operation::Delete));
}
#[test]
fn test_watch_filter_hash_and_eq() {
use std::collections::HashSet;
let mut set = HashSet::new();
set.insert(WatchFilter::resource_type("document"));
set.insert(WatchFilter::resource_type("folder"));
set.insert(WatchFilter::resource_type("document"));
assert_eq!(set.len(), 2);
}
#[test]
fn test_watch_event_accessor_methods() {
let rel = Relationship::new("document:readme", "viewer", "user:alice");
let event = WatchEvent::new(Operation::Delete, rel, 999, chrono::Utc::now())
.with_actor("admin@example.com")
.with_request_id("req-abc-123");
assert_eq!(event.resource(), "document:readme");
assert_eq!(event.relation(), "viewer");
assert_eq!(event.subject(), "user:alice");
assert!(event.is_delete());
assert!(!event.is_create());
assert_eq!(event.actor.as_deref(), Some("admin@example.com"));
assert_eq!(event.request_id.as_deref(), Some("req-abc-123"));
}
#[test]
fn test_watch_shutdown_handle_clone() {
let (handle, _receiver) = WatchShutdownHandle::new();
let cloned = handle.clone();
assert!(!cloned.is_shutdown());
handle.shutdown();
assert!(cloned.is_shutdown());
}
#[test]
fn test_watch_shutdown_handle_debug() {
let (handle, _receiver) = WatchShutdownHandle::new();
let debug_str = format!("{:?}", handle);
assert!(debug_str.contains("WatchShutdownHandle"));
}
#[test]
fn test_watch_filter_clone() {
let filter = WatchFilter::operations([Operation::Create, Operation::Delete]);
let cloned = filter.clone();
assert_eq!(filter, cloned);
}
#[test]
fn test_reconnect_config_new_equals_default() {
let new = ReconnectConfig::new();
let default = ReconnectConfig::default();
assert_eq!(new.max_retries, default.max_retries);
assert_eq!(new.initial_backoff, default.initial_backoff);
assert_eq!(new.max_backoff, default.max_backoff);
assert_eq!(new.backoff_multiplier, default.backoff_multiplier);
assert!((new.jitter - default.jitter).abs() < f64::EPSILON);
}
#[tokio::test]
async fn test_watch_stream_basic() {
use crate::auth::BearerCredentialsConfig;
use crate::transport::mock::MockTransport;
use std::sync::Arc;
let mock_transport = Arc::new(MockTransport::new());
let client = crate::Client::builder()
.url("https://api.example.com")
.credentials(BearerCredentialsConfig::new("test"))
.build_with_transport(mock_transport)
.await
.unwrap();
let stream = WatchStream::new(
client,
"org_test".to_string(),
"vlt_test".to_string(),
vec![],
None,
false,
None,
);
let debug = format!("{:?}", stream);
assert!(debug.contains("WatchStream"));
let handle = stream.shutdown_handle();
assert!(!handle.is_shutdown());
assert!(stream.last_revision().is_none());
}
#[tokio::test]
async fn test_watch_stream_shutdown() {
use crate::auth::BearerCredentialsConfig;
use crate::transport::mock::MockTransport;
use futures::StreamExt;
use std::sync::Arc;
let mock_transport = Arc::new(MockTransport::new());
let client = crate::Client::builder()
.url("https://api.example.com")
.credentials(BearerCredentialsConfig::new("test"))
.build_with_transport(mock_transport)
.await
.unwrap();
let mut stream = WatchStream::new(
client,
"org_test".to_string(),
"vlt_test".to_string(),
vec![],
Some(100),
true,
Some(ReconnectConfig::default()),
);
let handle = stream.shutdown_handle();
handle.shutdown();
assert!(handle.is_shutdown());
let result = stream.next().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_watch_stream_with_filters() {
use crate::auth::BearerCredentialsConfig;
use crate::transport::mock::MockTransport;
use futures::StreamExt;
use std::sync::Arc;
let mock_transport = Arc::new(MockTransport::new());
let client = crate::Client::builder()
.url("https://api.example.com")
.credentials(BearerCredentialsConfig::new("test"))
.build_with_transport(mock_transport)
.await
.unwrap();
let filters = vec![
WatchFilter::resource_type("document"),
WatchFilter::operations([Operation::Create]),
];
let mut stream = WatchStream::new(
client,
"org_test".to_string(),
"vlt_test".to_string(),
filters,
None,
false,
None,
);
stream.shutdown_handle().shutdown();
let result = stream.next().await;
assert!(result.is_none());
}
#[cfg(feature = "rest")]
#[tokio::test]
async fn test_watch_builder_run() {
use crate::auth::BearerCredentialsConfig;
use crate::transport::mock::MockTransport;
use std::sync::Arc;
let mock_transport = Arc::new(MockTransport::new());
let client = crate::Client::builder()
.url("https://api.example.com")
.credentials(BearerCredentialsConfig::new("test"))
.build_with_transport(mock_transport)
.await
.unwrap();
let vault = client.organization("org_test").vault("vlt_test");
let result = vault
.watch()
.filter(WatchFilter::resource_type("document"))
.from_revision(100)
.resumable()
.reconnect(ReconnectConfig::new().max_retries(5))
.run()
.await;
assert!(result.is_ok());
let stream = result.unwrap();
assert_eq!(stream.last_revision(), Some(100));
}
#[tokio::test]
async fn test_watch_builder_no_reconnect() {
use crate::auth::BearerCredentialsConfig;
use crate::transport::mock::MockTransport;
use std::sync::Arc;
let mock_transport = Arc::new(MockTransport::new());
let client = crate::Client::builder()
.url("https://api.example.com")
.credentials(BearerCredentialsConfig::new("test"))
.build_with_transport(mock_transport)
.await
.unwrap();
let vault = client.organization("org_test").vault("vlt_test");
let builder = vault.watch().no_reconnect();
let debug = format!("{:?}", builder);
assert!(debug.contains("WatchBuilder"));
assert!(builder.filters().is_empty());
assert!(builder.starting_revision().is_none());
assert!(!builder.is_resumable());
}
#[test]
fn test_watch_stream_matches_filters() {
let rel = Relationship::new("document:readme", "viewer", "user:alice");
let event = WatchEvent::new(Operation::Create, rel, 1, chrono::Utc::now());
let filter = WatchFilter::resource_type("document");
assert!(filter.matches(&event));
let filter = WatchFilter::resource_type("folder");
assert!(!filter.matches(&event));
}
#[tokio::test]
async fn test_watch_builder_full_options() {
use crate::auth::BearerCredentialsConfig;
use crate::transport::mock::MockTransport;
use std::sync::Arc;
let mock_transport = Arc::new(MockTransport::new());
let client = crate::Client::builder()
.url("https://api.example.com")
.credentials(BearerCredentialsConfig::new("test"))
.build_with_transport(mock_transport)
.await
.unwrap();
let vault = client.organization("org_test").vault("vlt_test");
let builder = vault
.watch()
.filter(WatchFilter::resource_type("doc"))
.from_revision(42)
.resumable()
.reconnect(ReconnectConfig::default());
let debug = format!("{:?}", builder);
assert!(debug.contains("WatchBuilder"));
assert!(debug.contains("org_test"));
assert!(debug.contains("vlt_test"));
assert_eq!(builder.filters().len(), 1);
assert_eq!(builder.starting_revision(), Some(42));
assert!(builder.is_resumable());
}
}