dittolive-ditto 4.13.0

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
//! Use attachments to sync large binary files between peers.

use std::{
    collections::HashMap,
    path::{Path, PathBuf},
    sync::{Arc, Weak},
};

use ffi_sdk::{BoxedAttachmentHandle, BoxedDitto};
use safer_ffi::prelude::*;
use serde::ser::SerializeMap;

use crate::{ditto::TryUpgrade, prelude::DittoError, utils::prelude::ErrorKind};

// TODO(v5): Remove pub
#[doc(hidden)]
pub mod fetch_event;
pub use self::fetch_event::DittoAttachmentFetchEvent;

// TODO(v5): Remove pub
#[doc(hidden)]
pub mod fetcher;
pub use self::fetcher::{DittoAttachmentFetcher, FetcherVersion};

// TODO(v5): Remove pub
#[doc(hidden)]
pub mod token;
pub use self::token::{DittoAttachmentToken, DittoAttachmentTokenLike};

#[derive(Debug)]
/// Represents an attachment and can be used to insert the associated attachment into a document at
/// a specific key.
pub struct DittoAttachment {
    id: Box<[u8]>,
    len: u64,
    metadata: HashMap<String, String>,
    ditto: Weak<BoxedDitto>,
    attachment_handle: BoxedAttachmentHandle,
}

impl DittoAttachment {
    /// Returns the `id` of this attachment, encoded so as to be compatible with the
    /// [`crate::store::dql::Query`] API.
    pub fn id(&self) -> String {
        crate::utils::base64_encode_unpadded(&self.id)
    }

    /// Returns the `len`, in bytes, of this attachment's data. Compatible with the
    /// [`crate::store::dql::Query`] API.
    #[allow(clippy::len_without_is_empty)]
    pub fn len(&self) -> u64 {
        self.len
    }

    /// Returns the metadata that was associated with this attachment file when the source peer
    /// called [`new_attachment`][crate::store::Store::new_attachment].
    pub fn metadata(&self) -> &HashMap<String, String> {
        &self.metadata
    }
}

impl serde::Serialize for DittoAttachment {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        let mut map = serializer.serialize_map(Some(4))?;
        map.serialize_entry(
            // This breaks compatibility with the Rust SDK prior
            // to versions 3.0.8 and 4.0.0. It is necessary for
            // cross-SDK attachments compatibility.
            "_ditto_internal_type_jkb12973t4b",
            &(::ffi_sdk::DittoCrdtType::Attachment as u64),
        )?;
        map.serialize_entry("_id", ::serde_bytes::Bytes::new(&self.id[..]))?;
        map.serialize_entry("_len", &self.len)?;
        map.serialize_entry("_meta", &self.metadata)?;
        map.end()
    }
}

impl DittoAttachment {
    /// Create a new DittoAttachment
    pub(crate) fn new(
        id: Box<[u8]>,
        len: u64,
        metadata: HashMap<String, String>,
        ditto: Weak<BoxedDitto>,
        attachment_handle: BoxedAttachmentHandle,
    ) -> Self {
        Self {
            id,
            len,
            metadata,
            ditto,
            attachment_handle,
        }
    }

    pub(crate) fn from_file_and_metadata(
        filepath: &(impl ?Sized + AsRef<Path>),
        metadata: HashMap<String, String>,
        ditto: &Arc<ffi_sdk::BoxedDitto>,
    ) -> Result<DittoAttachment, DittoError> {
        let source_path = char_p::new(filepath.as_ref().to_str().unwrap());
        let file_operation = ffi_sdk::AttachmentFileOperation::Copy;
        let mut slot = ::core::mem::MaybeUninit::<ffi_sdk::Attachment>::uninit();
        let status = {
            ffi_sdk::ditto_new_attachment_from_file(
                ditto,
                source_path.as_ref(),
                file_operation,
                slot.as_out(),
            )
        };
        if status != 0 {
            Err(DittoError::from_ffi(ErrorKind::InvalidInput))
        } else {
            let attachment = unsafe { slot.assume_init() }; // safe assuming above ffi call was successful
            let ret = DittoAttachment::new(
                attachment.id.into(),
                attachment.len,
                metadata,
                Arc::downgrade(ditto),
                attachment.handle,
            );
            Ok(ret)
        }
    }

    pub(crate) fn from_bytes_and_metadata(
        bytes: &(impl ?Sized + AsRef<[u8]>),
        metadata: HashMap<String, String>,
        ditto: &Arc<ffi_sdk::BoxedDitto>,
    ) -> Result<DittoAttachment, DittoError> {
        let mut slot = ::core::mem::MaybeUninit::<ffi_sdk::Attachment>::uninit();
        let status = {
            ffi_sdk::ditto_new_attachment_from_bytes(ditto, bytes.as_ref().into(), slot.as_out())
        };
        if status != 0 {
            Err(DittoError::from_ffi(ErrorKind::InvalidInput))
        } else {
            let attachment = unsafe { slot.assume_init() }; // safe assuming above ffi call was successful
            let ret = DittoAttachment::new(
                attachment.id.into(),
                attachment.len,
                metadata,
                Arc::downgrade(ditto),
                attachment.handle,
            );
            Ok(ret)
        }
    }

    /// Create a new DittoAttachment from a Token
    pub(crate) fn new_with_token(
        token: DittoAttachmentToken,
        ditto: Weak<BoxedDitto>,
        attachment_handle: BoxedAttachmentHandle,
    ) -> Self {
        Self {
            id: token.id,
            len: token.len,
            metadata: token.metadata,
            ditto,
            attachment_handle,
        }
    }

    /// Return path to an attachment.
    /// # Panics
    /// Panics if Ditto has been released
    pub fn path(&self) -> PathBuf {
        // FIXME(Ronan) Ideally wrap this function in a Result
        let ditto = self.ditto.try_upgrade().unwrap();
        let p = ffi_sdk::ditto_get_complete_attachment_path(&ditto, &self.attachment_handle);
        let p_string = p.to_string();
        p_string.into()
    }
}

#[cfg(test)]
#[allow(deprecated)]
// TODO(v5): remove query_builder tests
mod tests {
    use std::{
        collections::HashMap,
        sync::{
            atomic::{AtomicBool, Ordering},
            Arc, Mutex,
        },
    };

    use serde_json::json;

    use crate::{
        prelude::*,
        store::{
            ditto_attachment_fetch_event::DittoAttachmentFetchEvent,
            ditto_attachment_token::DittoAttachmentToken,
        },
        test_helpers::setup_ditto,
    };

    #[::tokio::test]
    async fn attachment_serialize() {
        let ditto = setup_ditto().unwrap();
        let store = ditto.store();
        let collection = store.collection("test").unwrap();

        let original_test_file_path = "tests/data/attachment_file_1.txt";

        let metadata = {
            let mut m = HashMap::new();
            m.insert("key_1".to_string(), "value_1".to_string());
            m.insert("key_2".to_string(), "value_2".to_string());
            m
        };

        let attachment = store
            .new_attachment(original_test_file_path, metadata.clone())
            .await
            .expect("new_attachment");
        let attachment_id = attachment.id.clone();
        let attachment_len = attachment.len;
        let attachment_file_path = attachment.path();
        assert_ne!(
            original_test_file_path,
            attachment_file_path
                .clone()
                .into_os_string()
                .into_string()
                .unwrap()
        );

        // We want to test that you can upsert with an attachment when the document's contents is
        // provided as JSON or as CBOR (so that the attachment's ID gets serialized as an array or a
        // binary blob respectively)
        let id = collection
            .upsert(json!({ "hello": "again", "att": attachment }))
            .unwrap();

        {
            let mut map = HashMap::new();
            map.insert("hello", serde_cbor::value::to_value("again").unwrap());
            map.insert("att", serde_cbor::value::to_value(&attachment).unwrap());
            let _other = collection.upsert(map).unwrap();
        }

        let mut doc = collection.find_by_id(id).exec().unwrap();

        // TODO(Ham): We should not be able to call `set` on a document returned by a call to `exec`
        let set = doc.set("att_two", &attachment);
        assert!(set.is_ok());

        let attachment_token = doc.get::<DittoAttachmentToken>("att").unwrap();
        assert_eq!(attachment_token.id, attachment_id);
        assert_eq!(attachment_token.len, attachment_len);
        assert_eq!(attachment_token.metadata, metadata);

        let attachment_token_two = doc.get::<DittoAttachmentToken>("att_two").unwrap();
        assert_eq!(attachment_token.id, attachment_token_two.id);
        assert_eq!(attachment_token.len, attachment_token_two.len);
        assert_eq!(attachment_token.metadata, attachment_token_two.metadata);

        let test_file = std::fs::read(original_test_file_path).unwrap();
        let attachment_file = std::fs::read(attachment_file_path).unwrap();

        assert_eq!(test_file, attachment_file);

        assert_eq!(test_file.len() as u64, attachment_len);
    }

    #[test]
    fn attachment_fetch_legacy() {
        #![allow(deprecated)]
        let ditto = setup_ditto().unwrap();
        let store = ditto.store();
        let collection = store.collection("test").unwrap();

        let original_test_file_path = "tests/data/attachment_file_1.txt";

        let attachment = collection
            .new_attachment(original_test_file_path, HashMap::new())
            .expect("new_attachment");

        let collection = store.collection("test").unwrap();
        let id = collection.upsert(json!({"hello": "again"})).unwrap();
        let mut doc = collection.find_by_id(id).exec().unwrap();

        let set = doc.set("att", attachment);
        assert!(set.is_ok());

        let attachment_token = doc.get::<DittoAttachmentToken>("att").unwrap();

        let finished = Arc::new(AtomicBool::new(false));
        let finished_clone = Arc::clone(&finished);
        let fetched_attachment_data: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(vec![]));

        let _fetcher = collection
            .fetch_attachment(attachment_token, |event| {
                if let DittoAttachmentFetchEvent::Completed { attachment } = event {
                    let att_data_mtx = &*fetched_attachment_data; // move (copy) and reborrow
                    if let Ok(mut fetched_attachment_data) = att_data_mtx.lock() {
                        *fetched_attachment_data = std::fs::read(attachment.path()).unwrap();
                        finished_clone.store(true, Ordering::SeqCst);
                    }
                }
            })
            .unwrap();

        while !finished.load(Ordering::SeqCst) {
            std::thread::yield_now();
        }

        let test_file_data = std::fs::read(original_test_file_path).unwrap();
        let fetched_att_data = fetched_attachment_data.lock().unwrap();
        assert_eq!(test_file_data, *fetched_att_data);
    }

    #[::tokio::test]
    async fn attachment_fetch() {
        let ditto = setup_ditto().unwrap();
        let store = ditto.store();
        let collection = store.collection("test").unwrap();

        let original_test_file_path = "tests/data/attachment_file_1.txt";

        let attachment = store
            .new_attachment(original_test_file_path, HashMap::new())
            .await
            .expect("new_attachment");

        let id = collection.upsert(json!({"hello": "again"})).unwrap();
        let mut doc = collection.find_by_id(id).exec().unwrap();

        let set = doc.set("att", attachment);
        assert!(set.is_ok());

        let attachment_token = doc.get::<DittoAttachmentToken>("att").unwrap();

        let finished = Arc::new(AtomicBool::new(false));
        let finished_clone = Arc::clone(&finished);
        let fetched_attachment_data: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(vec![]));

        let _fetcher = store
            .fetch_attachment(attachment_token, {
                let fetched_attachment_data = fetched_attachment_data.clone();
                move |event| {
                    if let DittoAttachmentFetchEvent::Completed { attachment } = event {
                        let att_data_mtx = &*fetched_attachment_data; // move (copy) and reborrow
                        if let Ok(mut fetched_attachment_data) = att_data_mtx.lock() {
                            *fetched_attachment_data = std::fs::read(attachment.path()).unwrap();
                            finished_clone.store(true, Ordering::SeqCst);
                        }
                    }
                }
            })
            .unwrap();

        while !finished.load(Ordering::SeqCst) {
            std::thread::yield_now();
        }

        let test_file_data = std::fs::read(original_test_file_path).unwrap();
        let fetched_att_data = fetched_attachment_data.lock().unwrap();
        assert_eq!(test_file_data, *fetched_att_data);
    }
}