#[cfg(test)]
use std::convert::TryInto;
#[cfg(test)]
use crate::{bson::Bson, bson_compat::RawError};
use crate::{
bson::{DateTime, Document, RawBson, RawDocumentBuf, Timestamp},
cursor::common::CursorSpecification,
options::ChangeStreamOptions,
};
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct ResumeToken(pub(crate) RawBson);
impl ResumeToken {
pub(crate) fn initial(
options: Option<&ChangeStreamOptions>,
spec: &CursorSpecification,
) -> Option<ResumeToken> {
match &spec.post_batch_resume_token {
Some(token) if spec.is_empty => Some(token.clone()),
_ => options
.and_then(|o| o.start_after.as_ref().or(o.resume_after.as_ref()))
.cloned(),
}
}
pub(crate) fn from_raw(doc: Option<RawDocumentBuf>) -> Option<ResumeToken> {
doc.map(|doc| ResumeToken(RawBson::Document(doc)))
}
#[cfg(test)]
pub(crate) fn parsed(self) -> std::result::Result<Bson, RawError> {
self.0.try_into()
}
}
#[skip_serializing_none]
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ChangeStreamEvent<T> {
#[serde(rename = "_id")]
pub id: ResumeToken,
pub operation_type: OperationType,
pub ns: Option<ChangeNamespace>,
pub ns_type: Option<ChangeNamespaceType>,
pub to: Option<ChangeNamespace>,
pub lsid: Option<Document>,
pub txn_number: Option<i64>,
pub document_key: Option<Document>,
pub update_description: Option<UpdateDescription>,
pub cluster_time: Option<Timestamp>,
pub wall_time: Option<DateTime>,
pub full_document: Option<T>,
pub full_document_before_change: Option<T>,
}
#[skip_serializing_none]
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct UpdateDescription {
pub updated_fields: Document,
pub removed_fields: Vec<String>,
pub truncated_arrays: Option<Vec<TruncatedArray>>,
pub disambiguated_paths: Option<Document>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct TruncatedArray {
pub field: String,
pub new_size: i32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum OperationType {
Insert,
Update,
Replace,
Delete,
Drop,
Rename,
DropDatabase,
Invalidate,
Other(String),
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
enum OperationTypeHelper {
Insert,
Update,
Replace,
Delete,
Drop,
Rename,
DropDatabase,
Invalidate,
}
#[derive(Serialize, Deserialize)]
#[serde(untagged)]
enum OperationTypeWrapper<'a> {
Known(OperationTypeHelper),
Unknown(&'a str),
}
impl<'a> From<&'a OperationType> for OperationTypeWrapper<'a> {
fn from(src: &'a OperationType) -> Self {
match src {
OperationType::Insert => Self::Known(OperationTypeHelper::Insert),
OperationType::Update => Self::Known(OperationTypeHelper::Update),
OperationType::Replace => Self::Known(OperationTypeHelper::Replace),
OperationType::Delete => Self::Known(OperationTypeHelper::Delete),
OperationType::Drop => Self::Known(OperationTypeHelper::Drop),
OperationType::Rename => Self::Known(OperationTypeHelper::Rename),
OperationType::DropDatabase => Self::Known(OperationTypeHelper::DropDatabase),
OperationType::Invalidate => Self::Known(OperationTypeHelper::Invalidate),
OperationType::Other(s) => Self::Unknown(s),
}
}
}
impl From<OperationTypeWrapper<'_>> for OperationType {
fn from(src: OperationTypeWrapper) -> Self {
match src {
OperationTypeWrapper::Known(h) => match h {
OperationTypeHelper::Insert => Self::Insert,
OperationTypeHelper::Update => Self::Update,
OperationTypeHelper::Replace => Self::Replace,
OperationTypeHelper::Delete => Self::Delete,
OperationTypeHelper::Drop => Self::Drop,
OperationTypeHelper::Rename => Self::Rename,
OperationTypeHelper::DropDatabase => Self::DropDatabase,
OperationTypeHelper::Invalidate => Self::Invalidate,
},
OperationTypeWrapper::Unknown(s) => Self::Other(s.to_string()),
}
}
}
impl<'de> Deserialize<'de> for OperationType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
OperationTypeWrapper::deserialize(deserializer).map(OperationType::from)
}
}
impl Serialize for OperationType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
OperationTypeWrapper::serialize(&self.into(), serializer)
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ChangeNamespace {
pub db: String,
pub coll: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ChangeNamespaceType {
Collection,
Timeseries,
View,
#[serde(untagged)]
Other(String),
}