pub use crate::rpc::pb::mvccpb::event::EventType;
use crate::error::{Error, Result};
use crate::intercept::InterceptedChannel;
use crate::rpc::pb::etcdserverpb::watch_client::WatchClient as PbWatchClient;
use crate::rpc::pb::etcdserverpb::watch_request::RequestUnion as WatchRequestUnion;
use crate::rpc::pb::etcdserverpb::{
WatchCancelRequest, WatchCreateRequest, WatchProgressRequest, WatchRequest,
WatchResponse as PbWatchResponse,
};
use crate::rpc::pb::mvccpb::Event as PbEvent;
use crate::rpc::{KeyRange, KeyValue, ResponseHeader};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tonic::Streaming;
#[repr(transparent)]
#[derive(Clone)]
pub struct WatchClient {
inner: PbWatchClient<InterceptedChannel>,
}
impl WatchClient {
#[inline]
pub(crate) fn new(channel: InterceptedChannel) -> Self {
let inner = PbWatchClient::new(channel);
Self { inner }
}
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
pub async fn watch(
&mut self,
key: impl Into<Vec<u8>>,
options: Option<WatchOptions>,
) -> Result<WatchStream> {
let (request_sender, request_receiver) = channel::<WatchRequest>(100);
request_sender
.send(options.unwrap_or_default().with_key(key).into())
.await
.map_err(|e| Error::WatchError(e.to_string()))?;
let request_stream = ReceiverStream::new(request_receiver);
let response_stream = self.inner.watch(request_stream).await?.into_inner();
Ok(WatchStream::new(request_sender, response_stream))
}
}
#[derive(Debug, Default, Clone)]
pub struct WatchOptions {
req: WatchCreateRequest,
key_range: KeyRange,
}
impl WatchOptions {
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key_range.with_key(key);
self
}
#[inline]
pub const fn new() -> Self {
Self {
req: WatchCreateRequest {
key: Vec::new(),
range_end: Vec::new(),
start_revision: 0,
progress_notify: false,
filters: Vec::new(),
prev_kv: false,
watch_id: 0,
fragment: false,
},
key_range: KeyRange::new(),
}
}
#[inline]
pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
self.key_range.with_range(end);
self
}
#[inline]
pub fn with_from_key(mut self) -> Self {
self.key_range.with_from_key();
self
}
#[inline]
pub fn with_prefix(mut self) -> Self {
self.key_range.with_prefix();
self
}
#[inline]
pub fn with_all_keys(mut self) -> Self {
self.key_range.with_all_keys();
self
}
#[inline]
pub const fn with_start_revision(mut self, revision: i64) -> Self {
self.req.start_revision = revision;
self
}
#[inline]
pub const fn with_progress_notify(mut self) -> Self {
self.req.progress_notify = true;
self
}
#[inline]
pub fn with_filters(mut self, filters: impl Into<Vec<WatchFilterType>>) -> Self {
self.req.filters = filters.into().into_iter().map(|f| f as i32).collect();
self
}
#[inline]
pub const fn with_prev_key(mut self) -> Self {
self.req.prev_kv = true;
self
}
#[inline]
pub const fn with_watch_id(mut self, watch_id: i64) -> Self {
self.req.watch_id = watch_id;
self
}
#[inline]
pub const fn with_fragment(mut self) -> Self {
self.req.fragment = true;
self
}
}
impl From<WatchOptions> for WatchCreateRequest {
#[inline]
fn from(mut options: WatchOptions) -> Self {
let (key, range_end) = options.key_range.build();
options.req.key = key;
options.req.range_end = range_end;
options.req
}
}
impl From<WatchOptions> for WatchRequest {
#[inline]
fn from(options: WatchOptions) -> Self {
Self {
request_union: Some(WatchRequestUnion::CreateRequest(options.into())),
}
}
}
impl From<WatchCancelRequest> for WatchRequest {
#[inline]
fn from(req: WatchCancelRequest) -> Self {
Self {
request_union: Some(WatchRequestUnion::CancelRequest(req)),
}
}
}
impl From<WatchProgressRequest> for WatchRequest {
#[inline]
fn from(req: WatchProgressRequest) -> Self {
Self {
request_union: Some(WatchRequestUnion::ProgressRequest(req)),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum WatchFilterType {
NoPut = 0,
NoDelete = 1,
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct WatchResponse(PbWatchResponse);
impl WatchResponse {
#[inline]
const fn new(resp: PbWatchResponse) -> Self {
Self(resp)
}
#[inline]
pub fn header(&self) -> Option<&ResponseHeader> {
self.0.header.as_ref().map(From::from)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub const fn watch_id(&self) -> i64 {
self.0.watch_id
}
#[inline]
pub const fn created(&self) -> bool {
self.0.created
}
#[inline]
pub const fn canceled(&self) -> bool {
self.0.canceled
}
#[inline]
pub const fn compact_revision(&self) -> i64 {
self.0.compact_revision
}
#[inline]
pub fn cancel_reason(&self) -> &str {
&self.0.cancel_reason
}
#[inline]
pub fn events(&self) -> &[Event] {
unsafe { &*(self.0.events.as_slice() as *const _ as *const [Event]) }
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Event(PbEvent);
impl Event {
#[inline]
pub fn event_type(&self) -> EventType {
match self.0.r#type {
0 => EventType::Put,
1 => EventType::Delete,
i => panic!("unknown event {i}"),
}
}
#[inline]
pub fn kv(&self) -> Option<&KeyValue> {
self.0.kv.as_ref().map(From::from)
}
#[inline]
pub fn prev_kv(&self) -> Option<&KeyValue> {
self.0.prev_kv.as_ref().map(From::from)
}
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug)]
pub struct WatchStream {
request_sender: WatchRequestSender,
response_stream: WatchResponseStream,
}
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug)]
pub struct WatchRequestSender(Sender<WatchRequest>);
#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
#[derive(Debug)]
pub struct WatchResponseStream(Streaming<PbWatchResponse>);
impl WatchResponseStream {
#[inline]
pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
self.0
.message()
.await
.map(|resp| resp.map(WatchResponse::new))
.map_err(From::from)
}
}
impl WatchStream {
#[inline]
const fn new(
request_sender: Sender<WatchRequest>,
response_stream: Streaming<PbWatchResponse>,
) -> Self {
Self {
request_sender: WatchRequestSender(request_sender),
response_stream: WatchResponseStream(response_stream),
}
}
#[inline]
pub async fn watch(
&mut self,
key: impl Into<Vec<u8>>,
options: Option<WatchOptions>,
) -> Result<()> {
self.request_sender.watch(key, options).await
}
#[inline]
pub async fn cancel(&mut self, watch_id: i64) -> Result<()> {
self.request_sender.cancel(watch_id).await
}
#[inline]
pub async fn request_progress(&mut self) -> Result<()> {
self.request_sender.request_progress().await
}
#[inline]
pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
self.response_stream.message().await
}
pub fn split(self) -> (WatchRequestSender, WatchResponseStream) {
(self.request_sender, self.response_stream)
}
}
impl WatchRequestSender {
#[inline]
async fn send(&mut self, req: WatchRequest) -> Result<()> {
self.0
.send(req)
.await
.map_err(|e| Error::WatchError(e.to_string()))
}
#[inline]
pub async fn watch(
&mut self,
key: impl Into<Vec<u8>>,
options: Option<WatchOptions>,
) -> Result<()> {
self.send(options.unwrap_or_default().with_key(key).into())
.await
}
#[inline]
pub async fn cancel(&mut self, watch_id: i64) -> Result<()> {
let req = WatchCancelRequest { watch_id };
self.send(req.into()).await
}
#[inline]
pub async fn request_progress(&mut self) -> Result<()> {
let req = WatchProgressRequest {};
self.send(req.into()).await
}
}
impl Stream for WatchResponseStream {
type Item = Result<WatchResponse>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().0)
.poll_next(cx)
.map(|t| match t {
Some(Ok(resp)) => Some(Ok(WatchResponse::new(resp))),
Some(Err(e)) => Some(Err(From::from(e))),
None => None,
})
}
}
impl Stream for WatchStream {
type Item = Result<WatchResponse>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().response_stream).poll_next(cx)
}
}