use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use crate::config::{ClientConfig, ClientConfigBuilder};
use crate::connection::ConnectionState;
use crate::error::KubemqError;
use crate::server_info::ServerInfo;
use crate::transport::grpc::GrpcTransport;
use crate::Result;
struct Inner {
transport: GrpcTransport,
closed: AtomicBool,
cancel: CancellationToken,
transport_cancel: CancellationToken,
}
#[derive(Clone)]
pub struct KubemqClient {
inner: Arc<Inner>,
}
impl KubemqClient {
pub fn builder() -> ClientConfigBuilder {
ClientConfigBuilder::new()
}
pub(crate) fn from_transport(transport: GrpcTransport) -> Self {
let transport_cancel = transport.cancel_token();
Self {
inner: Arc::new(Inner {
transport,
closed: AtomicBool::new(false),
cancel: CancellationToken::new(),
transport_cancel,
}),
}
}
pub(crate) fn child_token(&self) -> CancellationToken {
self.inner.cancel.child_token()
}
pub fn state(&self) -> ConnectionState {
self.inner.transport.state()
}
pub async fn ping(&self) -> Result<ServerInfo> {
self.check_closed()?;
self.inner.transport.ping().await
}
pub async fn close(&self) -> Result<()> {
if self.inner.closed.swap(true, Ordering::AcqRel) {
return Ok(()); }
self.inner.cancel.cancel();
let drain = self.config().drain_timeout();
let _ = tokio::time::timeout(drain, async {
tokio::task::yield_now().await;
})
.await;
self.inner.transport.close().await;
if let Some(cb) = &self.config().on_closed {
cb().await;
}
Ok(())
}
pub(crate) fn check_closed(&self) -> Result<()> {
if self.inner.closed.load(Ordering::Acquire) {
return Err(KubemqError::ClientClosed);
}
Ok(())
}
pub(crate) fn check_state(&self, _operation: &str) -> Result<()> {
self.check_closed()
}
pub fn config(&self) -> &ClientConfig {
self.inner.transport.config()
}
pub(crate) fn transport(&self) -> &GrpcTransport {
&self.inner.transport
}
}
impl Drop for Inner {
fn drop(&mut self) {
if !self.closed.load(Ordering::Acquire) {
tracing::warn!("KubemqClient dropped without calling close()");
self.cancel.cancel();
self.transport_cancel.cancel();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cancel_tokens_cancelled_on_drop() {
let cancel = CancellationToken::new();
let transport_cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let transport_clone = transport_cancel.clone();
assert!(!cancel_clone.is_cancelled());
assert!(!transport_clone.is_cancelled());
cancel.cancel();
transport_cancel.cancel();
assert!(cancel_clone.is_cancelled());
assert!(transport_clone.is_cancelled());
}
#[test]
fn test_transport_cancel_independent_of_client_cancel() {
let cancel = CancellationToken::new();
let transport_cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let transport_clone = transport_cancel.clone();
cancel.cancel();
assert!(cancel_clone.is_cancelled());
assert!(
!transport_clone.is_cancelled(),
"transport_cancel should be independent"
);
transport_cancel.cancel();
assert!(transport_clone.is_cancelled());
}
#[test]
fn test_closed_flag_prevents_cancel_on_drop() {
let closed = AtomicBool::new(true);
let cancel = CancellationToken::new();
let transport_cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let transport_clone = transport_cancel.clone();
if !closed.load(Ordering::Acquire) {
cancel.cancel();
transport_cancel.cancel();
}
assert!(
!cancel_clone.is_cancelled(),
"cancel should NOT fire when closed=true"
);
assert!(
!transport_clone.is_cancelled(),
"transport_cancel should NOT fire when closed=true"
);
}
}