use std::sync::Arc;
use bytes::Bytes;
use solid_pod_rs::storage::memory::MemoryBackend;
use solid_pod_rs::storage::Storage;
use solid_pod_rs::wac::resolver::{AclResolver, StorageAclResolver};
use solid_pod_rs::wac::{evaluate_access, parse_jsonld_acl, AccessMode, AclDocument};
fn acl_body(agent: &str, path: &str, mode: &str) -> Bytes {
Bytes::from(format!(
r#"{{
"@context": {{"acl": "http://www.w3.org/ns/auth/acl#"}},
"@graph": [{{
"acl:agent": {{"@id": "{agent}"}},
"acl:accessTo": {{"@id": "{path}"}},
"acl:mode": {{"@id": "acl:{mode}"}}
}}]
}}"#
))
}
fn public_read_acl(path: &str) -> Bytes {
Bytes::from(format!(
r#"{{
"@graph": [{{
"acl:agentClass": {{"@id": "foaf:Agent"}},
"acl:accessTo": {{"@id": "{path}"}},
"acl:mode": {{"@id": "acl:Read"}}
}}]
}}"#
))
}
fn multi_mode_acl(agent: &str, path: &str, modes: &[&str]) -> Bytes {
let mode_entries: Vec<String> = modes
.iter()
.map(|m| format!(r#"{{"@id": "acl:{m}"}}"#))
.collect();
let mode_json = if mode_entries.len() == 1 {
mode_entries[0].clone()
} else {
format!("[{}]", mode_entries.join(", "))
};
Bytes::from(format!(
r#"{{
"@graph": [{{
"acl:agent": {{"@id": "{agent}"}},
"acl:accessTo": {{"@id": "{path}"}},
"acl:mode": {mode_json}
}}]
}}"#
))
}
fn parse_acl(body: &[u8]) -> AclDocument {
parse_jsonld_acl(body).expect("ACL body should parse as valid JSON-LD")
}
#[tokio::test]
async fn two_writers_same_acl_no_corruption() {
let store = Arc::new(MemoryBackend::new());
let acl_path = "/shared.acl";
store
.put(acl_path, public_read_acl("/shared"), "application/ld+json")
.await
.unwrap();
let n = 50;
let mut handles = Vec::with_capacity(n * 2);
for i in 0..n {
let s = store.clone();
handles.push(tokio::spawn(async move {
let body = acl_body("did:nostr:alice", "/shared", "Read");
s.put(acl_path, body, "application/ld+json").await.unwrap();
("A", i)
}));
}
for i in 0..n {
let s = store.clone();
handles.push(tokio::spawn(async move {
let body = acl_body("did:nostr:bob", "/shared", "Write");
s.put(acl_path, body, "application/ld+json").await.unwrap();
("B", i)
}));
}
for h in handles {
h.await.unwrap();
}
let (body, _meta) = store.get(acl_path).await.unwrap();
let doc = parse_acl(&body);
let graph = doc.graph.as_ref().expect("graph must exist");
assert_eq!(graph.len(), 1, "exactly one authorisation rule expected");
let alice_read = evaluate_access(
Some(&doc),
Some("did:nostr:alice"),
"/shared",
AccessMode::Read,
None,
);
let bob_write = evaluate_access(
Some(&doc),
Some("did:nostr:bob"),
"/shared",
AccessMode::Write,
None,
);
assert!(
alice_read ^ bob_write,
"final ACL must be either alice-Read or bob-Write, got alice_read={alice_read} bob_write={bob_write}"
);
}
#[tokio::test]
async fn writer_reader_concurrency_consistent_reads() {
let store = Arc::new(MemoryBackend::new());
let acl_path = "/resource.acl";
let resolver = Arc::new(StorageAclResolver::new(store.clone()));
store
.put(
acl_path,
public_read_acl("/resource"),
"application/ld+json",
)
.await
.unwrap();
let iterations = 100;
let mut handles = Vec::with_capacity(iterations * 2);
for i in 0..iterations {
let s = store.clone();
handles.push(tokio::spawn(async move {
let body = if i % 2 == 0 {
acl_body("did:nostr:alice", "/resource", "Read")
} else {
acl_body("did:nostr:bob", "/resource", "Write")
};
s.put(acl_path, body, "application/ld+json").await.unwrap();
}));
}
for _ in 0..iterations {
let r = resolver.clone();
handles.push(tokio::spawn(async move {
let doc = r.find_effective_acl("/resource").await.unwrap();
if let Some(ref d) = doc {
let graph = d.graph.as_ref();
assert!(
graph.is_some() && !graph.unwrap().is_empty(),
"resolved ACL must have at least one authorisation"
);
let alice_read = evaluate_access(
doc.as_ref(),
Some("did:nostr:alice"),
"/resource",
AccessMode::Read,
None,
);
let bob_write = evaluate_access(
doc.as_ref(),
Some("did:nostr:bob"),
"/resource",
AccessMode::Write,
None,
);
let public_read =
evaluate_access(doc.as_ref(), None, "/resource", AccessMode::Read, None);
let valid = alice_read || bob_write || public_read;
assert!(
valid,
"read must see a consistent ACL state: \
alice_read={alice_read} bob_write={bob_write} public_read={public_read}"
);
}
}));
}
for h in handles {
h.await.unwrap();
}
}
#[tokio::test]
async fn rapid_sequential_mutations_verify_final_state() {
let store = Arc::new(MemoryBackend::new());
let acl_path = "/seq.acl";
let total = 100usize;
for i in 0..total {
let agent = format!("did:nostr:agent-{i}");
let body = acl_body(&agent, "/seq", "Read");
store
.put(acl_path, body, "application/ld+json")
.await
.unwrap();
}
let (body, _) = store.get(acl_path).await.unwrap();
let doc = parse_acl(&body);
let last_agent = format!("did:nostr:agent-{}", total - 1);
assert!(
evaluate_access(
Some(&doc),
Some(&last_agent),
"/seq",
AccessMode::Read,
None,
),
"last agent must have Read"
);
for i in 0..total - 1 {
let agent = format!("did:nostr:agent-{i}");
assert!(
!evaluate_access(Some(&doc), Some(&agent), "/seq", AccessMode::Read, None,),
"agent-{i} must NOT have Read — only the last writer's ACL survives"
);
}
}
#[tokio::test]
async fn contended_rwlock_many_spawned_tasks() {
let store = Arc::new(MemoryBackend::new());
let acl_path = "/contended.acl";
store
.put(
acl_path,
public_read_acl("/contended"),
"application/ld+json",
)
.await
.unwrap();
let writers = 20;
let readers = 80;
let mut handles = Vec::with_capacity(writers + readers);
for i in 0..writers {
let s = store.clone();
handles.push(tokio::spawn(async move {
let agent = format!("did:nostr:writer-{i}");
let body = multi_mode_acl(&agent, "/contended", &["Read", "Write"]);
s.put(acl_path, body, "application/ld+json").await.unwrap();
}));
}
for _ in 0..readers {
let s = store.clone();
handles.push(tokio::spawn(async move {
match s.get(acl_path).await {
Ok((body, _)) => {
let doc = parse_acl(&body);
assert!(doc.graph.is_some(), "ACL document must have a graph field");
}
Err(_) => {
}
}
}));
}
for h in handles {
h.await.unwrap();
}
let (body, _) = store.get(acl_path).await.unwrap();
let doc = parse_acl(&body);
let graph = doc.graph.as_ref().expect("graph present");
assert_eq!(graph.len(), 1, "single authorisation rule");
}
#[tokio::test]
async fn concurrent_delete_and_write_no_panic() {
let store = Arc::new(MemoryBackend::new());
let acl_path = "/ephemeral.acl";
store
.put(
acl_path,
public_read_acl("/ephemeral"),
"application/ld+json",
)
.await
.unwrap();
let rounds = 50;
let mut handles = Vec::with_capacity(rounds * 2);
for _ in 0..rounds {
let s = store.clone();
handles.push(tokio::spawn(async move {
let _ = s.delete(acl_path).await; }));
let s = store.clone();
handles.push(tokio::spawn(async move {
let body = acl_body("did:nostr:phoenix", "/ephemeral", "Read");
let _ = s.put(acl_path, body, "application/ld+json").await;
}));
}
for h in handles {
h.await.unwrap();
}
match store.get(acl_path).await {
Ok((body, _)) => {
let doc = parse_acl(&body);
let graph = doc.graph.as_ref().expect("graph present");
assert!(!graph.is_empty());
}
Err(solid_pod_rs::PodError::NotFound(_)) => {
}
Err(e) => panic!("unexpected error: {e}"),
}
}
#[tokio::test]
async fn resolver_walk_up_under_concurrent_mutations() {
let store = Arc::new(MemoryBackend::new());
let resolver = Arc::new(StorageAclResolver::new(store.clone()));
store
.put("/.acl", public_read_acl("/"), "application/ld+json")
.await
.unwrap();
store
.put(
"/a.acl",
acl_body("did:nostr:alice", "/a", "Write"),
"application/ld+json",
)
.await
.unwrap();
let iterations = 50;
let mut handles = Vec::with_capacity(iterations * 2);
for i in 0..iterations {
let s = store.clone();
handles.push(tokio::spawn(async move {
let body = if i % 2 == 0 {
acl_body("did:nostr:alice", "/a", "Write")
} else {
acl_body("did:nostr:bob", "/a", "Read")
};
s.put("/a.acl", body, "application/ld+json").await.unwrap();
}));
}
for _ in 0..iterations {
let r = resolver.clone();
handles.push(tokio::spawn(async move {
let doc = r.find_effective_acl("/a").await.unwrap();
let d = doc.expect("should find /a.acl");
let graph = d.graph.as_ref().expect("graph present");
assert_eq!(graph.len(), 1);
let alice_write = evaluate_access(
Some(&d),
Some("did:nostr:alice"),
"/a",
AccessMode::Write,
None,
);
let bob_read = evaluate_access(
Some(&d),
Some("did:nostr:bob"),
"/a",
AccessMode::Read,
None,
);
assert!(
alice_write || bob_read,
"must see one of the two valid ACL states"
);
}));
}
for h in handles {
h.await.unwrap();
}
}
#[tokio::test]
async fn watch_events_fire_for_acl_mutations() {
let store = Arc::new(MemoryBackend::new());
let acl_path = "/watched.acl";
let mut rx = store.watch("/").await.unwrap();
let mutations = 10;
for i in 0..mutations {
let agent = format!("did:nostr:watcher-{i}");
let body = acl_body(&agent, "/watched", "Read");
store
.put(acl_path, body, "application/ld+json")
.await
.unwrap();
}
let mut events = Vec::new();
for _ in 0..mutations {
match tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()).await {
Ok(Some(ev)) => events.push(ev),
Ok(None) => break,
Err(_) => break,
}
}
assert!(
!events.is_empty(),
"must receive at least one storage event"
);
assert_eq!(
events.len(),
mutations,
"must receive exactly {mutations} events, got {}",
events.len()
);
}