1use std::borrow::Cow;
2use std::fmt::Debug;
3
4use arc_bytes::serde::{Bytes, CowBytes};
5use serde::de::{self, Visitor};
6use serde::ser::SerializeStruct;
7use serde::{Deserialize, Serialize};
8
9use crate::connection::{AsyncConnection, Connection};
10use crate::document::{
11 BorrowedDocument, CollectionHeader, DocumentId, HasHeader, Header, OwnedDocument,
12};
13use crate::schema::SerializedCollection;
14use crate::transaction::{Operation, Transaction};
15use crate::Error;
16
17#[derive(Clone, Debug, Eq, PartialEq)]
19pub struct CollectionDocument<C>
20where
21 C: SerializedCollection,
22{
23 pub header: CollectionHeader<C::PrimaryKey>,
25
26 pub contents: C::Contents,
28}
29
30impl<'a, C> TryFrom<&'a BorrowedDocument<'a>> for CollectionDocument<C>
31where
32 C: SerializedCollection,
33{
34 type Error = Error;
35
36 fn try_from(value: &'a BorrowedDocument<'a>) -> Result<Self, Self::Error> {
37 Ok(Self {
38 contents: C::deserialize(&value.contents)?,
39 header: CollectionHeader::try_from(value.header.clone())?,
40 })
41 }
42}
43
44impl<'a, C> TryFrom<&'a OwnedDocument> for CollectionDocument<C>
45where
46 C: SerializedCollection,
47{
48 type Error = Error;
49
50 fn try_from(value: &'a OwnedDocument) -> Result<Self, Self::Error> {
51 Ok(Self {
52 contents: C::deserialize(&value.contents)?,
53 header: CollectionHeader::try_from(value.header.clone())?,
54 })
55 }
56}
57
58impl<'a, 'b, C> TryFrom<&'b CollectionDocument<C>> for BorrowedDocument<'a>
59where
60 C: SerializedCollection,
61{
62 type Error = crate::Error;
63
64 fn try_from(value: &'b CollectionDocument<C>) -> Result<Self, Self::Error> {
65 Ok(Self {
66 contents: CowBytes::from(C::serialize(&value.contents)?),
67 header: Header::try_from(value.header.clone())?,
68 })
69 }
70}
71
72impl<C> CollectionDocument<C>
73where
74 C: SerializedCollection,
75{
76 pub fn update<Cn: Connection>(&mut self, connection: &Cn) -> Result<(), Error> {
95 let mut doc = self.to_document()?;
96
97 connection.update::<C, _>(&mut doc)?;
98
99 self.header = CollectionHeader::try_from(doc.header)?;
100
101 Ok(())
102 }
103
104 pub fn update_in_transaction(&self, transaction: &mut Transaction) -> Result<(), Error> {
108 transaction.push(Operation::update_serialized::<C>(
109 self.header.clone(),
110 &self.contents,
111 )?);
112 Ok(())
113 }
114
115 pub async fn update_async<Cn: AsyncConnection>(
132 &mut self,
133 connection: &Cn,
134 ) -> Result<(), Error> {
135 let mut doc = self.to_document()?;
136
137 connection.update::<C, _>(&mut doc).await?;
138
139 self.header = CollectionHeader::try_from(doc.header)?;
140
141 Ok(())
142 }
143
144 pub fn modify<Cn: Connection, Modifier: FnMut(&mut Self) + Send + Sync>(
155 &mut self,
156 connection: &Cn,
157 mut modifier: Modifier,
158 ) -> Result<(), Error>
159 where
160 C::Contents: Clone,
161 {
162 let mut is_first_loop = true;
163 loop {
165 if is_first_loop {
169 is_first_loop = false;
170 } else {
171 *self =
172 C::get(&self.header.id, connection)?.ok_or_else(|| {
173 match DocumentId::new(&self.header.id) {
174 Ok(id) => Error::DocumentNotFound(C::collection_name(), Box::new(id)),
175 Err(err) => err,
176 }
177 })?;
178 }
179 modifier(&mut *self);
180 match self.update(connection) {
181 Err(Error::DocumentConflict(..)) => {}
182 other => return other,
183 }
184 }
185 }
186
187 pub async fn modify_async<Cn: AsyncConnection, Modifier: FnMut(&mut Self) + Send + Sync>(
198 &mut self,
199 connection: &Cn,
200 mut modifier: Modifier,
201 ) -> Result<(), Error>
202 where
203 C::Contents: Clone,
204 {
205 let mut is_first_loop = true;
206 loop {
208 if is_first_loop {
212 is_first_loop = false;
213 } else {
214 *self = C::get_async(&self.header.id, connection)
215 .await?
216 .ok_or_else(|| match DocumentId::new(&self.header.id) {
217 Ok(id) => Error::DocumentNotFound(C::collection_name(), Box::new(id)),
218 Err(err) => err,
219 })?;
220 }
221 modifier(&mut *self);
222 match self.update_async(connection).await {
223 Err(Error::DocumentConflict(..)) => {}
224 other => return other,
225 }
226 }
227 }
228
229 pub fn delete<Cn: Connection>(&self, connection: &Cn) -> Result<(), Error> {
244 connection.collection::<C>().delete(self)?;
245
246 Ok(())
247 }
248
249 pub async fn delete_async<Cn: AsyncConnection>(&self, connection: &Cn) -> Result<(), Error> {
264 connection.collection::<C>().delete(self).await?;
265
266 Ok(())
267 }
268
269 pub fn delete_in_transaction(&self, transaction: &mut Transaction) -> Result<(), Error> {
273 transaction.push(Operation::delete(C::collection_name(), self.header()?));
274 Ok(())
275 }
276
277 pub fn refresh<Cn: Connection>(&mut self, connection: &Cn) -> Result<(), Error> {
280 let id = DocumentId::new(&self.header.id)?;
281 *self = C::get(&id, connection)?
282 .ok_or_else(|| Error::DocumentNotFound(C::collection_name(), Box::new(id)))?;
283 Ok(())
284 }
285
286 pub async fn refresh_async<Cn: AsyncConnection>(
289 &mut self,
290 connection: &Cn,
291 ) -> Result<(), Error> {
292 let id = DocumentId::new(&self.header.id)?;
293 *self = C::get_async(&id, connection)
294 .await?
295 .ok_or_else(|| Error::DocumentNotFound(C::collection_name(), Box::new(id)))?;
296 Ok(())
297 }
298
299 pub fn to_document(&self) -> Result<OwnedDocument, Error> {
301 Ok(OwnedDocument {
302 contents: Bytes::from(C::serialize(&self.contents)?),
303 header: Header::try_from(self.header.clone())?,
304 })
305 }
306}
307
308impl<C> Serialize for CollectionDocument<C>
309where
310 C: SerializedCollection,
311 C::Contents: Serialize,
312 C::PrimaryKey: Serialize,
313{
314 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
315 where
316 S: serde::Serializer,
317 {
318 let mut s = serializer.serialize_struct("CollectionDocument", 2)?;
319 s.serialize_field("header", &self.header)?;
320 s.serialize_field("contents", &self.contents)?;
321 s.end()
322 }
323}
324
325impl<'de, C> Deserialize<'de> for CollectionDocument<C>
326where
327 C: SerializedCollection,
328 C::PrimaryKey: Deserialize<'de>,
329 C::Contents: Deserialize<'de>,
330{
331 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
332 where
333 D: serde::Deserializer<'de>,
334 {
335 struct CollectionDocumentVisitor<C>
336 where
337 C: SerializedCollection,
338 {
339 header: Option<CollectionHeader<C::PrimaryKey>>,
340 contents: Option<C::Contents>,
341 }
342
343 impl<C> Default for CollectionDocumentVisitor<C>
344 where
345 C: SerializedCollection,
346 {
347 fn default() -> Self {
348 Self {
349 header: None,
350 contents: None,
351 }
352 }
353 }
354
355 impl<'de, C> Visitor<'de> for CollectionDocumentVisitor<C>
356 where
357 C: SerializedCollection,
358 C::PrimaryKey: Deserialize<'de>,
359 C::Contents: Deserialize<'de>,
360 {
361 type Value = CollectionDocument<C>;
362
363 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364 formatter.write_str("a collection document")
365 }
366
367 fn visit_map<A>(mut self, mut map: A) -> Result<Self::Value, A::Error>
368 where
369 A: serde::de::MapAccess<'de>,
370 {
371 while let Some(key) = map.next_key::<Cow<'_, str>>()? {
372 match key.as_ref() {
373 "header" => {
374 self.header = Some(map.next_value()?);
375 }
376 "contents" => {
377 self.contents = Some(map.next_value()?);
378 }
379 _ => {
380 return Err(<A::Error as de::Error>::custom(format!(
381 "unknown field {key}"
382 )))
383 }
384 }
385 }
386
387 Ok(CollectionDocument {
388 header: self
389 .header
390 .ok_or_else(|| <A::Error as de::Error>::custom("`header` missing"))?,
391 contents: self
392 .contents
393 .ok_or_else(|| <A::Error as de::Error>::custom("`contents` missing"))?,
394 })
395 }
396
397 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
398 where
399 A: de::SeqAccess<'de>,
400 {
401 let header = seq
402 .next_element()?
403 .ok_or_else(|| <A::Error as de::Error>::custom("`header` missing"))?;
404 let contents = seq
405 .next_element()?
406 .ok_or_else(|| <A::Error as de::Error>::custom("`contents` missing"))?;
407 Ok(CollectionDocument { header, contents })
408 }
409 }
410
411 deserializer.deserialize_struct(
412 "CollectionDocument",
413 &["header", "contents"],
414 CollectionDocumentVisitor::default(),
415 )
416 }
417}
418
419pub trait OwnedDocuments {
421 fn collection_documents<C: SerializedCollection>(
423 &self,
424 ) -> Result<Vec<CollectionDocument<C>>, Error>;
425}
426
427impl OwnedDocuments for [OwnedDocument] {
428 fn collection_documents<C: SerializedCollection>(
429 &self,
430 ) -> Result<Vec<CollectionDocument<C>>, Error> {
431 self.iter().map(CollectionDocument::try_from).collect()
432 }
433}
434
435#[test]
436fn collection_document_serialization() {
437 use crate::test_util::Basic;
438
439 let original: CollectionDocument<Basic> = CollectionDocument {
440 header: CollectionHeader {
441 id: 1,
442 revision: super::Revision::new(b"hello world"),
443 },
444 contents: Basic::new("test"),
445 };
446
447 let pot = pot::to_vec(&original).unwrap();
449 assert_eq!(
450 pot::from_slice::<CollectionDocument<Basic>>(&pot).unwrap(),
451 original
452 );
453 let bincode = transmog_bincode::bincode::serialize(&original).unwrap();
455 assert_eq!(
456 transmog_bincode::bincode::deserialize::<CollectionDocument<Basic>>(&bincode).unwrap(),
457 original
458 );
459}