bonsaidb_core/document/
collection.rs

1use std::borrow::Cow;
2use std::fmt::Debug;
3
4use arc_bytes::serde::{Bytes, CowBytes};
5use serde::de::{self, Visitor};
6use serde::ser::SerializeStruct;
7use serde::{Deserialize, Serialize};
8
9use crate::connection::{AsyncConnection, Connection};
10use crate::document::{
11    BorrowedDocument, CollectionHeader, DocumentId, HasHeader, Header, OwnedDocument,
12};
13use crate::schema::SerializedCollection;
14use crate::transaction::{Operation, Transaction};
15use crate::Error;
16
17/// A document with serializable contents.
18#[derive(Clone, Debug, Eq, PartialEq)]
19pub struct CollectionDocument<C>
20where
21    C: SerializedCollection,
22{
23    /// The header of the document, which contains the id and `Revision`.
24    pub header: CollectionHeader<C::PrimaryKey>,
25
26    /// The document's contents.
27    pub contents: C::Contents,
28}
29
30impl<'a, C> TryFrom<&'a BorrowedDocument<'a>> for CollectionDocument<C>
31where
32    C: SerializedCollection,
33{
34    type Error = Error;
35
36    fn try_from(value: &'a BorrowedDocument<'a>) -> Result<Self, Self::Error> {
37        Ok(Self {
38            contents: C::deserialize(&value.contents)?,
39            header: CollectionHeader::try_from(value.header.clone())?,
40        })
41    }
42}
43
44impl<'a, C> TryFrom<&'a OwnedDocument> for CollectionDocument<C>
45where
46    C: SerializedCollection,
47{
48    type Error = Error;
49
50    fn try_from(value: &'a OwnedDocument) -> Result<Self, Self::Error> {
51        Ok(Self {
52            contents: C::deserialize(&value.contents)?,
53            header: CollectionHeader::try_from(value.header.clone())?,
54        })
55    }
56}
57
58impl<'a, 'b, C> TryFrom<&'b CollectionDocument<C>> for BorrowedDocument<'a>
59where
60    C: SerializedCollection,
61{
62    type Error = crate::Error;
63
64    fn try_from(value: &'b CollectionDocument<C>) -> Result<Self, Self::Error> {
65        Ok(Self {
66            contents: CowBytes::from(C::serialize(&value.contents)?),
67            header: Header::try_from(value.header.clone())?,
68        })
69    }
70}
71
72impl<C> CollectionDocument<C>
73where
74    C: SerializedCollection,
75{
76    /// Updates the document stored in the database with the contents of this
77    /// collection document.
78    ///
79    /// ```rust
80    /// # bonsaidb_core::__doctest_prelude!();
81    /// # use bonsaidb_core::connection::Connection;
82    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
83    /// if let Some(mut document) = MyCollection::get(&42, &db)? {
84    ///     // ... do something `document`
85    ///     document.update(&db)?;
86    ///     println!(
87    ///         "The document has been updated: {:?}",
88    ///         document.header.revision
89    ///     );
90    /// }
91    /// # Ok(())
92    /// # }
93    /// ```
94    pub fn update<Cn: Connection>(&mut self, connection: &Cn) -> Result<(), Error> {
95        let mut doc = self.to_document()?;
96
97        connection.update::<C, _>(&mut doc)?;
98
99        self.header = CollectionHeader::try_from(doc.header)?;
100
101        Ok(())
102    }
103
104    /// Pushes an update [`Operation`] to the transaction for this document.
105    ///
106    /// The changes will happen once the transaction is applied.
107    pub fn update_in_transaction(&self, transaction: &mut Transaction) -> Result<(), Error> {
108        transaction.push(Operation::update_serialized::<C>(
109            self.header.clone(),
110            &self.contents,
111        )?);
112        Ok(())
113    }
114
115    /// Stores the new value of `contents` in the document.
116    ///
117    /// ```rust
118    /// # bonsaidb_core::__doctest_prelude!();
119    /// # use bonsaidb_core::connection::AsyncConnection;
120    /// # fn test_fn<C: AsyncConnection>(db: C) -> Result<(), Error> {
121    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
122    /// if let Some(mut document) = MyCollection::get_async(&42, &db).await? {
123    ///     // modify the document
124    ///     document.update_async(&db).await?;
125    ///     println!("Updated revision: {:?}", document.header.revision);
126    /// }
127    /// # Ok(())
128    /// # })
129    /// # }
130    /// ```
131    pub async fn update_async<Cn: AsyncConnection>(
132        &mut self,
133        connection: &Cn,
134    ) -> Result<(), Error> {
135        let mut doc = self.to_document()?;
136
137        connection.update::<C, _>(&mut doc).await?;
138
139        self.header = CollectionHeader::try_from(doc.header)?;
140
141        Ok(())
142    }
143
144    /// Modifies `self`, automatically retrying the modification if the document
145    /// has been updated on the server.
146    ///
147    /// ## Data loss warning
148    ///
149    /// If you've modified `self` before calling this function and a conflict
150    /// occurs, all changes to self will be lost when the current document is
151    /// fetched before retrying the process again. When you use this function,
152    /// you should limit the edits to the value to within the `modifier`
153    /// callback.
154    pub fn modify<Cn: Connection, Modifier: FnMut(&mut Self) + Send + Sync>(
155        &mut self,
156        connection: &Cn,
157        mut modifier: Modifier,
158    ) -> Result<(), Error>
159    where
160        C::Contents: Clone,
161    {
162        let mut is_first_loop = true;
163        // TODO this should have a retry-limit.
164        loop {
165            // On the first attempt, we want to try sending the update to the
166            // database without fetching new contents. If we receive a conflict,
167            // on future iterations we will first re-load the data.
168            if is_first_loop {
169                is_first_loop = false;
170            } else {
171                *self =
172                    C::get(&self.header.id, connection)?.ok_or_else(|| {
173                        match DocumentId::new(&self.header.id) {
174                            Ok(id) => Error::DocumentNotFound(C::collection_name(), Box::new(id)),
175                            Err(err) => err,
176                        }
177                    })?;
178            }
179            modifier(&mut *self);
180            match self.update(connection) {
181                Err(Error::DocumentConflict(..)) => {}
182                other => return other,
183            }
184        }
185    }
186
187    /// Modifies `self`, automatically retrying the modification if the document
188    /// has been updated on the server.
189    ///
190    /// ## Data loss warning
191    ///
192    /// If you've modified `self` before calling this function and a conflict
193    /// occurs, all changes to self will be lost when the current document is
194    /// fetched before retrying the process again. When you use this function,
195    /// you should limit the edits to the value to within the `modifier`
196    /// callback.
197    pub async fn modify_async<Cn: AsyncConnection, Modifier: FnMut(&mut Self) + Send + Sync>(
198        &mut self,
199        connection: &Cn,
200        mut modifier: Modifier,
201    ) -> Result<(), Error>
202    where
203        C::Contents: Clone,
204    {
205        let mut is_first_loop = true;
206        // TODO this should have a retry-limit.
207        loop {
208            // On the first attempt, we want to try sending the update to the
209            // database without fetching new contents. If we receive a conflict,
210            // on future iterations we will first re-load the data.
211            if is_first_loop {
212                is_first_loop = false;
213            } else {
214                *self = C::get_async(&self.header.id, connection)
215                    .await?
216                    .ok_or_else(|| match DocumentId::new(&self.header.id) {
217                        Ok(id) => Error::DocumentNotFound(C::collection_name(), Box::new(id)),
218                        Err(err) => err,
219                    })?;
220            }
221            modifier(&mut *self);
222            match self.update_async(connection).await {
223                Err(Error::DocumentConflict(..)) => {}
224                other => return other,
225            }
226        }
227    }
228
229    /// Removes the document from the collection.
230    ///
231    /// ```rust
232    /// # bonsaidb_core::__doctest_prelude!();
233    /// # use bonsaidb_core::connection::Connection;
234    /// # fn test_fn<C: Connection>(db: C) -> Result<(), Error> {
235    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
236    /// if let Some(document) = MyCollection::get(&42, &db)? {
237    ///     document.delete(&db)?;
238    /// }
239    /// # Ok(())
240    /// # })
241    /// # }
242    /// ```
243    pub fn delete<Cn: Connection>(&self, connection: &Cn) -> Result<(), Error> {
244        connection.collection::<C>().delete(self)?;
245
246        Ok(())
247    }
248
249    /// Removes the document from the collection.
250    ///
251    /// ```rust
252    /// # bonsaidb_core::__doctest_prelude!();
253    /// # use bonsaidb_core::connection::AsyncConnection;
254    /// # fn test_fn<C: AsyncConnection>(db: C) -> Result<(), Error> {
255    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
256    /// if let Some(document) = MyCollection::get_async(&42, &db).await? {
257    ///     document.delete_async(&db).await?;
258    /// }
259    /// # Ok(())
260    /// # })
261    /// # }
262    /// ```
263    pub async fn delete_async<Cn: AsyncConnection>(&self, connection: &Cn) -> Result<(), Error> {
264        connection.collection::<C>().delete(self).await?;
265
266        Ok(())
267    }
268
269    /// Pushes a delete [`Operation`] to the transaction for this document.
270    ///
271    /// The document will be deleted once the transaction is applied.
272    pub fn delete_in_transaction(&self, transaction: &mut Transaction) -> Result<(), Error> {
273        transaction.push(Operation::delete(C::collection_name(), self.header()?));
274        Ok(())
275    }
276
277    /// Refreshes this instance from `connection`. If the document is no longer
278    /// present, [`Error::DocumentNotFound`] will be returned.
279    pub fn refresh<Cn: Connection>(&mut self, connection: &Cn) -> Result<(), Error> {
280        let id = DocumentId::new(&self.header.id)?;
281        *self = C::get(&id, connection)?
282            .ok_or_else(|| Error::DocumentNotFound(C::collection_name(), Box::new(id)))?;
283        Ok(())
284    }
285
286    /// Refreshes this instance from `connection`. If the document is no longer
287    /// present, [`Error::DocumentNotFound`] will be returned.
288    pub async fn refresh_async<Cn: AsyncConnection>(
289        &mut self,
290        connection: &Cn,
291    ) -> Result<(), Error> {
292        let id = DocumentId::new(&self.header.id)?;
293        *self = C::get_async(&id, connection)
294            .await?
295            .ok_or_else(|| Error::DocumentNotFound(C::collection_name(), Box::new(id)))?;
296        Ok(())
297    }
298
299    /// Converts this value to a serialized `Document`.
300    pub fn to_document(&self) -> Result<OwnedDocument, Error> {
301        Ok(OwnedDocument {
302            contents: Bytes::from(C::serialize(&self.contents)?),
303            header: Header::try_from(self.header.clone())?,
304        })
305    }
306}
307
308impl<C> Serialize for CollectionDocument<C>
309where
310    C: SerializedCollection,
311    C::Contents: Serialize,
312    C::PrimaryKey: Serialize,
313{
314    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
315    where
316        S: serde::Serializer,
317    {
318        let mut s = serializer.serialize_struct("CollectionDocument", 2)?;
319        s.serialize_field("header", &self.header)?;
320        s.serialize_field("contents", &self.contents)?;
321        s.end()
322    }
323}
324
325impl<'de, C> Deserialize<'de> for CollectionDocument<C>
326where
327    C: SerializedCollection,
328    C::PrimaryKey: Deserialize<'de>,
329    C::Contents: Deserialize<'de>,
330{
331    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
332    where
333        D: serde::Deserializer<'de>,
334    {
335        struct CollectionDocumentVisitor<C>
336        where
337            C: SerializedCollection,
338        {
339            header: Option<CollectionHeader<C::PrimaryKey>>,
340            contents: Option<C::Contents>,
341        }
342
343        impl<C> Default for CollectionDocumentVisitor<C>
344        where
345            C: SerializedCollection,
346        {
347            fn default() -> Self {
348                Self {
349                    header: None,
350                    contents: None,
351                }
352            }
353        }
354
355        impl<'de, C> Visitor<'de> for CollectionDocumentVisitor<C>
356        where
357            C: SerializedCollection,
358            C::PrimaryKey: Deserialize<'de>,
359            C::Contents: Deserialize<'de>,
360        {
361            type Value = CollectionDocument<C>;
362
363            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364                formatter.write_str("a collection document")
365            }
366
367            fn visit_map<A>(mut self, mut map: A) -> Result<Self::Value, A::Error>
368            where
369                A: serde::de::MapAccess<'de>,
370            {
371                while let Some(key) = map.next_key::<Cow<'_, str>>()? {
372                    match key.as_ref() {
373                        "header" => {
374                            self.header = Some(map.next_value()?);
375                        }
376                        "contents" => {
377                            self.contents = Some(map.next_value()?);
378                        }
379                        _ => {
380                            return Err(<A::Error as de::Error>::custom(format!(
381                                "unknown field {key}"
382                            )))
383                        }
384                    }
385                }
386
387                Ok(CollectionDocument {
388                    header: self
389                        .header
390                        .ok_or_else(|| <A::Error as de::Error>::custom("`header` missing"))?,
391                    contents: self
392                        .contents
393                        .ok_or_else(|| <A::Error as de::Error>::custom("`contents` missing"))?,
394                })
395            }
396
397            fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
398            where
399                A: de::SeqAccess<'de>,
400            {
401                let header = seq
402                    .next_element()?
403                    .ok_or_else(|| <A::Error as de::Error>::custom("`header` missing"))?;
404                let contents = seq
405                    .next_element()?
406                    .ok_or_else(|| <A::Error as de::Error>::custom("`contents` missing"))?;
407                Ok(CollectionDocument { header, contents })
408            }
409        }
410
411        deserializer.deserialize_struct(
412            "CollectionDocument",
413            &["header", "contents"],
414            CollectionDocumentVisitor::default(),
415        )
416    }
417}
418
419/// Helper functions for a slice of [`OwnedDocument`]s.
420pub trait OwnedDocuments {
421    /// Returns a list of deserialized documents.
422    fn collection_documents<C: SerializedCollection>(
423        &self,
424    ) -> Result<Vec<CollectionDocument<C>>, Error>;
425}
426
427impl OwnedDocuments for [OwnedDocument] {
428    fn collection_documents<C: SerializedCollection>(
429        &self,
430    ) -> Result<Vec<CollectionDocument<C>>, Error> {
431        self.iter().map(CollectionDocument::try_from).collect()
432    }
433}
434
435#[test]
436fn collection_document_serialization() {
437    use crate::test_util::Basic;
438
439    let original: CollectionDocument<Basic> = CollectionDocument {
440        header: CollectionHeader {
441            id: 1,
442            revision: super::Revision::new(b"hello world"),
443        },
444        contents: Basic::new("test"),
445    };
446
447    // Pot uses a map to represent a struct
448    let pot = pot::to_vec(&original).unwrap();
449    assert_eq!(
450        pot::from_slice::<CollectionDocument<Basic>>(&pot).unwrap(),
451        original
452    );
453    // Bincode uses a sequence to represent a struct
454    let bincode = transmog_bincode::bincode::serialize(&original).unwrap();
455    assert_eq!(
456        transmog_bincode::bincode::deserialize::<CollectionDocument<Basic>>(&bincode).unwrap(),
457        original
458    );
459}