dittolive-ditto 5.0.0

Ditto is a peer to peer cross-platform database that allows mobile, web, IoT and server apps to sync with or without an internet connection.
Documentation
//! Test that connects the current (latest) Rust SDK sync against the oldest supported Ditto version
//!
//! Counterpart, which inserts the doc and listens on TCP is in `/tools/oldest-sdk-test`.

use std::{env, error::Error, fs::File, io::Read, process, sync::Arc, time::Duration};

use dittolive_ditto::{fs::TempRoot, prelude::*};
use serde_json::json;

const EXAMPLE_ATTACHMENT_DATA: &[u8] = b"Some data";

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut args = env::args();
    let _app_name = args.next();
    let database_id = args.next().expect("expected App ID as first parameter");
    let doc_id = args
        .next()
        .expect("expected target document ID as second parameter");
    let connect_port: u16 = args
        .next()
        .expect("expected connect port as third parameter")
        .parse()
        .expect("expected port to be a valid u16");

    let root = TempRoot::new();
    let ditto = Ditto::open_sync(
        DittoConfig::new(
            database_id,
            DittoConfigConnect::SmallPeersOnly { private_key: None },
        )
        .with_persistence_directory(root.root_path()),
    )?;
    ditto.set_license_from_env("DITTO_LICENSE")?;

    DittoLogger::set_minimum_log_level(LogLevel::Debug);

    let other_peer = format!("127.0.0.1:{}", connect_port);
    ditto.update_transport_config(|tc| {
        tc.connect.tcp_servers.insert(other_peer);
    });

    ditto.sync().start()?;
    let ditto = Arc::new(ditto);

    println!("old_sync_test looking for doc_id {}", doc_id);
    let doc_id = DocumentId::new(&doc_id).unwrap();

    let query = "SELECT * FROM test_coll WHERE _id = :id";
    let params = json!({"id": doc_id});
    let query = (query, params);

    let (doc_tx, mut doc_rx) = tokio::sync::mpsc::channel(5);
    let _sub = ditto.sync().register_subscription(&query).unwrap();
    let _obs = ditto
        .store()
        .register_observer(&query, move |query_result| {
            let Some(item) = query_result.iter().next() else {
                return;
            };

            doc_tx.try_send(item.value()).unwrap();
        })
        .unwrap();

    let doc = tokio::time::timeout(Duration::from_secs(10), doc_rx.recv())
        .await
        .expect("doc timeout")
        .expect("doc received Some");
    println!("Received doc: {doc:?}");

    let attachment_token = doc
        .get("att")
        .expect("should have attachment")
        .as_object()
        .expect("attachment is object");
    println!("Got attachment token");

    let _fetcher = ditto
        .store()
        .fetch_attachment(attachment_token, move |event| {
            println!("Attachment callback! event={event:?}");
            if let DittoAttachmentFetchEvent::Completed { attachment: att } = event {
                let mut file = File::open(att.path()).unwrap();
                let mut buf = vec![];
                file.read_to_end(&mut buf).unwrap();
                assert_eq!(buf, EXAMPLE_ATTACHMENT_DATA);
                println!("old_sync_test found expected attachment");
                process::exit(0);
            }
        })
        .unwrap();
    println!("Setup fetcher");

    // Should succeed within a reasonable time
    tokio::time::sleep(Duration::from_secs(10)).await;
    Err("timed out".into())
}