use std::{collections::HashMap, error::Error, fmt::Display};
use serde::Deserialize;
#[cfg(feature = "unstable")]
use zenoh_config::wrappers::EntityGlobalId;
use zenoh_keyexpr::OwnedKeyExpr;
#[cfg(feature = "unstable")]
use zenoh_protocol::core::EntityGlobalIdProto;
use zenoh_protocol::core::Parameters;
pub use zenoh_protocol::network::request::ext::QueryTarget;
#[doc(inline)]
pub use zenoh_protocol::zenoh::query::ConsolidationMode;
use crate::api::{
bytes::ZBytes,
encoding::Encoding,
handlers::{Callback, CallbackParameter},
sample::Sample,
Id,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct QueryConsolidation {
pub(crate) mode: ConsolidationMode,
}
impl QueryConsolidation {
pub const DEFAULT: Self = Self::AUTO;
pub const AUTO: Self = Self {
mode: ConsolidationMode::Auto,
};
pub(crate) const fn from_mode(mode: ConsolidationMode) -> Self {
Self { mode }
}
pub fn mode(&self) -> ConsolidationMode {
self.mode
}
}
impl From<ConsolidationMode> for QueryConsolidation {
fn from(mode: ConsolidationMode) -> Self {
Self::from_mode(mode)
}
}
impl Default for QueryConsolidation {
fn default() -> Self {
Self::DEFAULT
}
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct ReplyError {
pub(crate) payload: ZBytes,
pub(crate) encoding: Encoding,
}
impl ReplyError {
pub(crate) fn new(payload: impl Into<ZBytes>, encoding: Encoding) -> Self {
Self {
payload: payload.into(),
encoding,
}
}
#[inline]
pub fn payload(&self) -> &ZBytes {
&self.payload
}
#[inline]
pub fn payload_mut(&mut self) -> &mut ZBytes {
&mut self.payload
}
#[inline]
pub fn encoding(&self) -> &Encoding {
&self.encoding
}
#[zenoh_macros::internal]
pub fn empty() -> Self {
ReplyError {
payload: ZBytes::new(),
encoding: Encoding::default(),
}
}
}
impl Display for ReplyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"query returned an error with a {}-byte payload and encoding {}",
self.payload.len(),
self.encoding
)
}
}
impl Error for ReplyError {}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub struct Reply {
pub(crate) result: Result<Sample, ReplyError>,
#[cfg(feature = "unstable")]
pub(crate) replier_id: Option<EntityGlobalIdProto>,
}
impl Reply {
pub fn result(&self) -> Result<&Sample, &ReplyError> {
self.result.as_ref()
}
pub fn result_mut(&mut self) -> Result<&mut Sample, &mut ReplyError> {
self.result.as_mut()
}
pub fn into_result(self) -> Result<Sample, ReplyError> {
self.result
}
#[zenoh_macros::unstable]
pub fn replier_id(&self) -> Option<EntityGlobalId> {
self.replier_id.map(Into::into)
}
#[zenoh_macros::internal]
pub fn empty() -> Self {
Reply {
result: Ok(Sample::empty()),
#[cfg(feature = "unstable")]
replier_id: None,
}
}
}
impl CallbackParameter for Reply {
type Message<'a> = Self;
fn from_message(msg: Self::Message<'_>) -> Self {
msg
}
}
impl From<Reply> for Result<Sample, ReplyError> {
fn from(value: Reply) -> Self {
value.into_result()
}
}
pub(crate) struct LivelinessQueryState {
pub(crate) callback: Callback<Reply>,
}
pub(crate) struct QueryState {
pub(crate) nb_final: usize,
pub(crate) key_expr: OwnedKeyExpr,
pub(crate) parameters: Parameters<'static>,
pub(crate) reception_mode: ConsolidationMode,
pub(crate) replies: Option<HashMap<OwnedKeyExpr, Reply>>,
pub(crate) callback: Callback<Reply>,
pub(crate) querier_id: Option<Id>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Deserialize)]
pub enum ReplyKeyExpr {
Any,
#[default]
MatchingQuery,
}