use std::sync::Arc;
use dwn::{
actor::Actor,
message::{
descriptor::{
protocols::{ProtocolDefinition, ProtocolsFilter},
records::Version,
Descriptor,
},
Data,
},
store::{DataStore, MessageStore, SurrealStore},
DWN,
};
use surrealdb::{engine::local::Mem, Surreal};
use tokio::net::TcpListener;
use tracing_test::traced_test;
struct TestContext<D: DataStore, M: MessageStore> {
alice_kyoto: Actor<D, M>,
alice_osaka: Actor<D, M>,
osaka_url: String,
}
async fn setup_test() -> TestContext<impl DataStore, impl MessageStore> {
let port = port_scanner::request_open_port().unwrap();
let db = Surreal::new::<Mem>(()).await.unwrap();
let store_osaka = SurrealStore::new(db).await.unwrap();
let dwn_osaka = Arc::new(DWN::from(store_osaka));
let alice_osaka = Actor::new_did_key(dwn_osaka.clone()).unwrap();
tokio::spawn(async move {
let router = dwn_server::router(dwn_osaka);
let url = format!("0.0.0.0:{}", port);
let listener = TcpListener::bind(url).await.unwrap();
axum::serve(listener, router).await.unwrap();
});
while port_scanner::scan_port(port) {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let db = Surreal::new::<Mem>(()).await.unwrap();
let store_kyoto = SurrealStore::new(db).await.unwrap();
let dwn_kyoto = Arc::new(DWN::from(store_kyoto.clone()));
let alice_kyoto = Actor {
attestation: alice_osaka.attestation.clone(),
authorization: alice_osaka.authorization.clone(),
did: alice_osaka.did.clone(),
dwn: dwn_kyoto,
remotes: Vec::new(),
};
let osaka_url = format!("http://localhost:{}", port);
TestContext {
alice_kyoto,
alice_osaka,
osaka_url,
}
}
#[tokio::test]
#[traced_test]
async fn test_read_remote() {
let TestContext {
mut alice_kyoto,
alice_osaka,
osaka_url,
} = setup_test().await;
alice_kyoto.add_remote(osaka_url.clone());
let data = "Hello from Osaka!".bytes().collect::<Vec<_>>();
let create = alice_osaka
.create_record()
.data(data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(create.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
alice_kyoto.remove_remote(&osaka_url);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
}
#[tokio::test]
#[traced_test]
async fn test_sync_push() {
let TestContext {
mut alice_kyoto,
alice_osaka,
osaka_url,
} = setup_test().await;
alice_kyoto.add_remote(osaka_url);
let data = "Hello from Kyoto!".bytes().collect::<Vec<_>>();
let create = alice_kyoto
.create_record()
.data(data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(create.reply.status.code, 200);
let read = alice_osaka
.read_record(create.record_id.clone())
.process()
.await;
assert!(read.is_err());
alice_kyoto.sync().await.unwrap();
let read = alice_osaka
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
}
#[tokio::test]
#[traced_test]
async fn test_sync_pull_update() {
let TestContext {
mut alice_kyoto,
alice_osaka,
osaka_url,
} = setup_test().await;
alice_kyoto.add_remote(osaka_url);
let data = "Hello from Osaka!".bytes().collect::<Vec<_>>();
let create = alice_osaka
.create_record()
.data(data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(create.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
let new_data = "Hello again from Osaka!".bytes().collect::<Vec<_>>();
let update = alice_osaka
.update_record(create.record_id.clone(), create.entry_id.clone())
.data(new_data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(update.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
alice_kyoto.sync().await.unwrap();
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&new_data)));
}
#[tokio::test]
#[traced_test]
async fn test_sync_pull_many_updates() {
let TestContext {
mut alice_kyoto,
alice_osaka,
osaka_url,
} = setup_test().await;
alice_kyoto.add_remote(osaka_url);
let data = "Hello from Osaka!".bytes().collect::<Vec<_>>();
let create = alice_osaka
.create_record()
.data(data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(create.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
let new_data = "Hello again from Osaka!".bytes().collect::<Vec<_>>();
let update = alice_osaka
.update_record(create.record_id.clone(), create.entry_id.clone())
.data(new_data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(update.reply.status.code, 200);
let newer_data = "Hello once more from Osaka!".bytes().collect::<Vec<_>>();
let update = alice_osaka
.update_record(create.record_id.clone(), update.entry_id.clone())
.data(newer_data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(update.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
alice_kyoto.sync().await.unwrap();
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&newer_data)));
}
#[tokio::test]
#[traced_test]
async fn test_sync_pull_delete() {
let TestContext {
mut alice_kyoto,
alice_osaka,
osaka_url,
} = setup_test().await;
alice_kyoto.add_remote(osaka_url);
let data = "Hello from Osaka!".bytes().collect::<Vec<_>>();
let create = alice_osaka
.create_record()
.data(data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(create.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
let delete = alice_osaka
.delete_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(delete.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
alice_kyoto.sync().await.unwrap();
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, None);
assert!(matches!(
read.record.descriptor,
Descriptor::RecordsDelete(_)
));
}
#[tokio::test]
#[traced_test]
async fn test_sync_pull_delete_after_update() {
let TestContext {
mut alice_kyoto,
alice_osaka,
osaka_url,
} = setup_test().await;
alice_kyoto.add_remote(osaka_url);
let data = "Hello from Osaka!".bytes().collect::<Vec<_>>();
let create = alice_osaka
.create_record()
.data(data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(create.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
let new_data = "Hello again from Osaka!".bytes().collect::<Vec<_>>();
let update = alice_osaka
.update_record(create.record_id.clone(), create.entry_id.clone())
.data(new_data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(update.reply.status.code, 200);
let read = alice_osaka
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&new_data)));
let delete = alice_osaka
.delete_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(delete.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
alice_kyoto.sync().await.unwrap();
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, None);
assert!(matches!(
read.record.descriptor,
Descriptor::RecordsDelete(_)
));
}
#[tokio::test]
#[traced_test]
async fn test_sync_pull_delete_after_local_update() {
let TestContext {
mut alice_kyoto,
alice_osaka,
osaka_url,
} = setup_test().await;
alice_kyoto.add_remote(osaka_url);
let data = "Hello from Osaka!".bytes().collect::<Vec<_>>();
let create = alice_osaka
.create_record()
.data(data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(create.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
let new_data = "Hello from Kyoto!".bytes().collect::<Vec<_>>();
let update = alice_kyoto
.update_record(create.record_id.clone(), create.entry_id.clone())
.data(new_data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(update.reply.status.code, 200);
let delete = alice_osaka
.delete_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(delete.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&new_data)));
alice_kyoto.sync().await.unwrap();
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, None);
assert!(matches!(
read.record.descriptor,
Descriptor::RecordsDelete(_)
));
}
#[tokio::test]
#[traced_test]
async fn test_sync_update_pulled() {
let TestContext {
mut alice_kyoto,
alice_osaka,
osaka_url,
} = setup_test().await;
alice_kyoto.add_remote(osaka_url);
let data = "Hello from Osaka!".bytes().collect::<Vec<_>>();
let create = alice_osaka
.create_record()
.data(data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(create.reply.status.code, 200);
let read = alice_kyoto
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&data)));
let new_data = "Hello from Kyoto!".bytes().collect::<Vec<_>>();
let update = alice_kyoto
.update_record(create.record_id.clone(), create.entry_id.clone())
.data(new_data.clone())
.data_format("application/json".to_string())
.process()
.await
.unwrap();
assert_eq!(update.reply.status.code, 200);
alice_kyoto.sync().await.unwrap();
let read = alice_osaka
.read_record(create.record_id.clone())
.process()
.await
.unwrap();
assert_eq!(read.status.code, 200);
assert_eq!(read.record.data, Some(Data::new_base64(&new_data)));
}
#[tokio::test]
#[traced_test]
async fn test_sync_protocols() {
let TestContext {
mut alice_kyoto,
alice_osaka,
osaka_url,
} = setup_test().await;
alice_kyoto.add_remote(osaka_url);
let definition = ProtocolDefinition {
published: true,
protocol: "my-protocol-1".to_string(),
..Default::default()
};
let register = alice_kyoto
.register_protocol(definition.clone())
.process()
.await
.unwrap();
assert_eq!(register.status.code, 200);
alice_kyoto.sync().await.unwrap();
let query = alice_osaka
.query_protocols(ProtocolsFilter {
protocol: definition.protocol,
versions: vec![Version::new(0, 0, 0)],
})
.process()
.await
.unwrap();
assert_eq!(query.status.code, 200);
assert!(!query.entries.is_empty());
}