meshlet-server 0.1.1

Sync relay server for meshlet: CRDT peer that stores and forwards bookmark updates
Documentation
use std::collections::BTreeSet;
use std::sync::Arc;

use axum::body::{to_bytes, Body};
use axum::http::{Request, StatusCode};
use meshlet_core::model::{Bookmark, BookmarkId};
use meshlet_core::MeshletDb;
use meshlet_proto::messages::{SyncRequest, SyncResponse};
use meshlet_server::{app_router, AppState, ServerDoc};
use tempfile::TempDir;
use tokio::sync::Mutex;
use tower::ServiceExt;

struct TestServer {
    router: axum::Router,
    _dir: TempDir,
}

impl TestServer {
    fn new(token: Option<&str>) -> Self {
        let dir = tempfile::tempdir().unwrap();
        let server_doc = ServerDoc::load_or_create(dir.path());
        let state = Arc::new(AppState {
            doc: Mutex::new(server_doc),
            token: token.map(String::from),
            data_dir: dir.path().to_path_buf(),
        });
        Self {
            router: app_router(state),
            _dir: dir,
        }
    }
}

async fn sync_once(router: &axum::Router, db: &MeshletDb, token: Option<&str>) {
    let last_vv = db.load_last_server_vv().unwrap();
    let client_updates = match &last_vv {
        Some(vv) => db.export_updates_since(vv).unwrap(),
        None => db
            .export_updates_since(&loro::VersionVector::default())
            .unwrap(),
    };
    let client_vv = db.oplog_vv();
    let request = SyncRequest::new(&client_vv, &client_updates);
    let body_json = serde_json::to_string(&request).unwrap();

    let mut builder = Request::builder()
        .method("POST")
        .uri("/sync")
        .header("content-type", "application/json");
    if let Some(t) = token {
        builder = builder.header("authorization", format!("Bearer {}", t));
    }
    let response = router
        .clone()
        .oneshot(builder.body(Body::from(body_json)).unwrap())
        .await
        .unwrap();

    assert_eq!(response.status(), StatusCode::OK);
    let bytes = to_bytes(response.into_body(), 1024 * 1024).await.unwrap();
    let sync_response: SyncResponse = serde_json::from_slice(&bytes).unwrap();

    let server_updates = sync_response.updates().unwrap();
    let server_vv = sync_response.server_vv().unwrap();
    if !server_updates.is_empty() {
        db.sync_import(&server_updates).unwrap();
    }
    db.save_last_server_vv(&server_vv).unwrap();
}

fn make_bookmark(url: &str, title: &str, tags: &[&str]) -> Bookmark {
    let now = meshlet_core::model::now_ts();
    Bookmark {
        id: BookmarkId::new(),
        url: url.into(),
        title: title.into(),
        desc: String::new(),
        tags: tags.iter().map(|s| s.to_string()).collect::<BTreeSet<_>>(),
        flags: 0,
        created_at: now,
        updated_at: now,
    }
}

#[tokio::test]
async fn two_clients_converge_via_server() {
    let server = TestServer::new(None);
    let client_a = MeshletDb::open_in_memory().unwrap();
    let client_b = MeshletDb::open_in_memory().unwrap();

    client_a
        .add_bookmark(&make_bookmark("https://rust-lang.org", "Rust", &["lang"]))
        .unwrap();
    client_b
        .add_bookmark(&make_bookmark("https://loro.dev", "Loro", &["crdt"]))
        .unwrap();

    for _ in 0..3 {
        sync_once(&server.router, &client_a, None).await;
        sync_once(&server.router, &client_b, None).await;
    }

    let a_list = client_a.list_from_mirror().unwrap();
    let b_list = client_b.list_from_mirror().unwrap();
    let a_urls: Vec<&str> = a_list.iter().map(|b| b.url.as_str()).collect();
    let b_urls: Vec<&str> = b_list.iter().map(|b| b.url.as_str()).collect();

    assert!(a_urls.contains(&"https://rust-lang.org"));
    assert!(a_urls.contains(&"https://loro.dev"));
    assert!(b_urls.contains(&"https://rust-lang.org"));
    assert!(b_urls.contains(&"https://loro.dev"));
    assert_eq!(a_urls.len(), 2);
    assert_eq!(b_urls.len(), 2);
}

#[tokio::test]
async fn dedup_merges_same_url() {
    let server = TestServer::new(None);
    let client_a = MeshletDb::open_in_memory().unwrap();
    let client_b = MeshletDb::open_in_memory().unwrap();

    client_a
        .add_bookmark(&make_bookmark("https://example.com/", "From A", &["a-tag"]))
        .unwrap();
    std::thread::sleep(std::time::Duration::from_millis(20));
    client_b
        .add_bookmark(&make_bookmark("https://example.com", "From B", &["b-tag"]))
        .unwrap();

    for _ in 0..3 {
        sync_once(&server.router, &client_a, None).await;
        sync_once(&server.router, &client_b, None).await;
    }

    let a_list = client_a.list_from_mirror().unwrap();
    let b_list = client_b.list_from_mirror().unwrap();
    assert_eq!(a_list.len(), 1, "client A should have one entry");
    assert_eq!(b_list.len(), 1, "client B should have one entry");

    let a = &a_list[0];
    let b = &b_list[0];
    assert_eq!(
        normalize(&a.url),
        normalize(&b.url),
        "both clients should keep the same URL"
    );
    assert!(
        a.tags.contains("a-tag") && a.tags.contains("b-tag"),
        "tags should be unioned on A: {:?}",
        a.tags
    );
    assert!(
        b.tags.contains("a-tag") && b.tags.contains("b-tag"),
        "tags should be unioned on B: {:?}",
        b.tags
    );
}

fn normalize(url: &str) -> String {
    let s = url.trim().to_lowercase();
    s.trim_end_matches('/').to_string()
}

#[tokio::test]
async fn unauthorized_token_rejected() {
    let server = TestServer::new(Some("secret-token"));

    let body = serde_json::to_string(&SyncRequest::new(
        &loro::VersionVector::default(),
        &[],
    ))
    .unwrap();

    let no_auth = send(&server.router, None, &body).await;
    assert_eq!(no_auth.status(), StatusCode::UNAUTHORIZED);

    let wrong = send(&server.router, Some("wrong"), &body).await;
    assert_eq!(wrong.status(), StatusCode::UNAUTHORIZED);

    let ok = send(&server.router, Some("secret-token"), &body).await;
    assert_eq!(ok.status(), StatusCode::OK);
}

async fn send(router: &axum::Router, token: Option<&str>, body: &str) -> axum::http::Response<Body> {
    let mut builder = Request::builder()
        .method("POST")
        .uri("/sync")
        .header("content-type", "application/json");
    if let Some(t) = token {
        builder = builder.header("authorization", format!("Bearer {}", t));
    }
    router
        .clone()
        .oneshot(builder.body(Body::from(body.to_string())).unwrap())
        .await
        .unwrap()
}