use arc_bytes::serde::Bytes;
use serde::{Deserialize, Serialize};
use crate::connection::{AsyncLowLevelConnection, LowLevelConnection};
use crate::document::{CollectionHeader, DocumentId, HasHeader, Header, Revision};
use crate::key::KeyEncoding;
use crate::schema::{Collection, CollectionName, SerializedCollection};
use crate::Error;
#[derive(Clone, Serialize, Deserialize, Default, Debug)]
#[must_use]
pub struct Transaction {
pub operations: Vec<Operation>,
}
impl Transaction {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, operation: Operation) {
self.operations.push(operation);
}
pub fn with(mut self, operation: Operation) -> Self {
self.push(operation);
self
}
pub fn apply<Connection: LowLevelConnection>(
self,
db: &Connection,
) -> Result<Vec<OperationResult>, Error> {
db.apply_transaction(self)
}
pub async fn apply_async<Connection: AsyncLowLevelConnection>(
self,
db: &Connection,
) -> Result<Vec<OperationResult>, Error> {
db.apply_transaction(self).await
}
}
impl From<Operation> for Transaction {
fn from(operation: Operation) -> Self {
Self {
operations: vec![operation],
}
}
}
impl Transaction {
pub fn insert(
collection: CollectionName,
id: Option<DocumentId>,
contents: impl Into<Bytes>,
) -> Self {
Self::from(Operation::insert(collection, id, contents))
}
pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
Self::from(Operation::update(collection, header, contents))
}
pub fn overwrite(
collection: CollectionName,
id: DocumentId,
contents: impl Into<Bytes>,
) -> Self {
Self::from(Operation::overwrite(collection, id, contents))
}
pub fn delete(collection: CollectionName, header: Header) -> Self {
Self::from(Operation::delete(collection, header))
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[must_use]
pub struct Operation {
pub collection: CollectionName,
pub command: Command,
}
impl Operation {
pub fn insert(
collection: CollectionName,
id: Option<DocumentId>,
contents: impl Into<Bytes>,
) -> Self {
Self {
collection,
command: Command::Insert {
id,
contents: contents.into(),
},
}
}
pub fn insert_serialized<C: SerializedCollection>(
id: Option<&C::PrimaryKey>,
contents: &C::Contents,
) -> Result<Self, Error> {
let id = id.map(DocumentId::new).transpose()?;
let contents = C::serialize(contents)?;
Ok(Self::insert(C::collection_name(), id, contents))
}
pub fn push_serialized<C: SerializedCollection>(contents: &C::Contents) -> Result<Self, Error> {
let id = C::natural_id(contents);
let id = id.as_ref().map(DocumentId::new).transpose()?;
let contents = C::serialize(contents)?;
Ok(Self::insert(C::collection_name(), id, contents))
}
pub fn update(collection: CollectionName, header: Header, contents: impl Into<Bytes>) -> Self {
Self {
collection,
command: Command::Update {
header,
contents: contents.into(),
},
}
}
pub fn update_serialized<C: SerializedCollection>(
header: CollectionHeader<C::PrimaryKey>,
contents: &C::Contents,
) -> Result<Self, Error> {
let contents = C::serialize(contents)?;
Ok(Self::update(
C::collection_name(),
Header::try_from(header)?,
contents,
))
}
pub fn overwrite(
collection: CollectionName,
id: DocumentId,
contents: impl Into<Bytes>,
) -> Self {
Self {
collection,
command: Command::Overwrite {
id,
contents: contents.into(),
},
}
}
pub fn overwrite_serialized<C: SerializedCollection, Key>(
id: &Key,
contents: &C::Contents,
) -> Result<Self, Error>
where
Key: KeyEncoding<C::PrimaryKey> + ?Sized,
{
let contents = C::serialize(contents)?;
Ok(Self::overwrite(
C::collection_name(),
DocumentId::new(id)?,
contents,
))
}
pub const fn delete(collection: CollectionName, header: Header) -> Self {
Self {
collection,
command: Command::Delete { header },
}
}
pub const fn check_document_id_exists(collection: CollectionName, id: DocumentId) -> Self {
Self {
collection,
command: Command::Check { id, revision: None },
}
}
pub fn check_document_exists<C: Collection>(id: &C::PrimaryKey) -> Result<Self, Error> {
Ok(Self::check_document_id_exists(
C::collection_name(),
DocumentId::new(id)?,
))
}
pub fn check_document_is_current<C: Collection, H: HasHeader>(
doc_or_header: &H,
) -> Result<Self, Error> {
let header = doc_or_header.header()?;
Ok(Self {
collection: C::collection_name(),
command: Command::Check {
id: header.id,
revision: Some(header.revision),
},
})
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum Command {
Insert {
id: Option<DocumentId>,
contents: Bytes,
},
Update {
header: Header,
contents: Bytes,
},
Overwrite {
id: DocumentId,
contents: Bytes,
},
Delete {
header: Header,
},
Check {
id: DocumentId,
revision: Option<Revision>,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum OperationResult {
Success,
DocumentUpdated {
collection: CollectionName,
header: Header,
},
DocumentDeleted {
collection: CollectionName,
id: DocumentId,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Executed {
pub id: u64,
pub changes: Changes,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Changes {
Documents(DocumentChanges),
Keys(Vec<ChangedKey>),
}
impl Changes {
#[must_use]
pub const fn documents(&self) -> Option<&DocumentChanges> {
if let Self::Documents(changes) = self {
Some(changes)
} else {
None
}
}
#[must_use]
pub fn keys(&self) -> Option<&[ChangedKey]> {
if let Self::Keys(keys) = self {
Some(keys)
} else {
None
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DocumentChanges {
pub collections: Vec<CollectionName>,
pub documents: Vec<ChangedDocument>,
}
impl DocumentChanges {
#[must_use]
pub fn get(&self, index: usize) -> Option<(&CollectionName, &ChangedDocument)> {
self.documents.get(index).and_then(|doc| {
self.collections
.get(usize::from(doc.collection))
.map(|collection| (collection, doc))
})
}
#[must_use]
pub fn len(&self) -> usize {
self.documents.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.documents.is_empty()
}
pub const fn iter(&self) -> DocumentChangesIter<'_> {
DocumentChangesIter {
changes: self,
index: Some(0),
}
}
}
#[must_use]
pub struct DocumentChangesIter<'a> {
changes: &'a DocumentChanges,
index: Option<usize>,
}
impl<'a> Iterator for DocumentChangesIter<'a> {
type Item = (&'a CollectionName, &'a ChangedDocument);
fn next(&mut self) -> Option<Self::Item> {
self.index.and_then(|index| {
let result = self.changes.get(index);
if result.is_some() {
self.index = index.checked_add(1);
}
result
})
}
}
#[must_use]
pub struct DocumentChangesIntoIter {
collections: Vec<CollectionName>,
documents: std::vec::IntoIter<ChangedDocument>,
}
impl Iterator for DocumentChangesIntoIter {
type Item = (CollectionName, ChangedDocument);
fn next(&mut self) -> Option<Self::Item> {
self.documents.next().and_then(|doc| {
self.collections
.get(usize::from(doc.collection))
.map(|collection| (collection.clone(), doc))
})
}
}
impl IntoIterator for DocumentChanges {
type IntoIter = DocumentChangesIntoIter;
type Item = (CollectionName, ChangedDocument);
fn into_iter(self) -> Self::IntoIter {
DocumentChangesIntoIter {
collections: self.collections,
documents: self.documents.into_iter(),
}
}
}
#[test]
fn document_changes_iter() {
use crate::schema::Qualified;
let changes = DocumentChanges {
collections: vec![CollectionName::private("a"), CollectionName::private("b")],
documents: vec![
ChangedDocument {
collection: 0,
id: DocumentId::from_u64(0),
deleted: false,
},
ChangedDocument {
collection: 0,
id: DocumentId::from_u64(1),
deleted: false,
},
ChangedDocument {
collection: 1,
id: DocumentId::from_u64(2),
deleted: false,
},
ChangedDocument {
collection: 2,
id: DocumentId::from_u64(3),
deleted: false,
},
],
};
assert_eq!(changes.len(), 4);
assert!(!changes.is_empty());
let mut a_changes = 0;
let mut b_changes = 0;
let mut ids = Vec::new();
for (collection, document) in changes.iter() {
assert!(!ids.contains(&document.id));
ids.push(document.id.clone());
match collection.name.as_ref() {
"a" => a_changes += 1,
"b" => b_changes += 1,
_ => unreachable!("invalid collection name {collection}"),
}
}
assert_eq!(a_changes, 2);
assert_eq!(b_changes, 1);
let mut a_changes = 0;
let mut b_changes = 0;
let mut ids = Vec::new();
for (collection, document) in changes {
assert!(!ids.contains(&document.id));
ids.push(document.id.clone());
match collection.name.as_ref() {
"a" => a_changes += 1,
"b" => b_changes += 1,
_ => unreachable!("invalid collection name {collection}"),
}
}
assert_eq!(a_changes, 2);
assert_eq!(b_changes, 1);
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangedDocument {
pub collection: u16,
pub id: DocumentId,
pub deleted: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChangedKey {
pub namespace: Option<String>,
pub key: String,
pub deleted: bool,
}