#[cfg(loom)]
mod loom_test {
use loom::{explore, stop_exploring, sync::atomic::AtomicUsize, thread::yield_now};
use loro::{ExportMode, LoroDoc, UpdateOptions};
use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
#[test]
fn concurrently_inserting_text_content() {
loom::model(|| {
let doc = LoroDoc::new();
let doc1 = doc.clone();
let doc2 = doc.clone();
let h0 = loom::thread::spawn(move || {
doc1.get_text("text").insert(0, "1").unwrap();
});
let h1 = loom::thread::spawn(move || {
for _ in 0..2 {
doc2.get_text("text").insert(0, "2").unwrap();
}
});
if let Err(e) = h0.join() {
eprintln!("Thread h0 failed: {:?}", e);
panic!("Thread h0 failed: {:?}", e);
}
if let Err(e) = h1.join() {
eprintln!("Thread h1 failed: {:?}", e);
panic!("Thread h1 failed: {:?}", e);
}
let text = doc.get_text("text");
assert_eq!(text.len_utf8(), 3);
dbg!("{}", text.to_string());
});
}
#[test]
fn concurrently_creating_events_with_subscriber() {
loom::model(|| {
let doc = LoroDoc::new();
let count = std::sync::Arc::new(AtomicUsize::new(0));
let count_clone = count.clone();
let _sub = doc.subscribe_root(std::sync::Arc::new(move |e| {
stop_exploring();
for e in e.events {
let v = e.diff.as_text().unwrap();
for v in v {
match &v {
loro::TextDelta::Retain { .. } => unreachable!(),
loro::TextDelta::Delete { .. } => unreachable!(),
loro::TextDelta::Insert { insert, .. } => {
count_clone.fetch_add(insert.len(), Ordering::SeqCst);
}
}
}
}
explore();
}));
let doc1 = doc.clone();
let doc2 = doc.clone();
let h0 = loom::thread::spawn(move || {
doc1.get_text("text").insert(0, "1").unwrap();
doc1.commit();
});
let h1 = loom::thread::spawn(move || {
doc2.get_text("text").insert(0, "2").unwrap();
doc2.commit();
});
h0.join().unwrap();
h1.join().unwrap();
let text = doc.get_text("text");
assert_eq!(text.len_utf8(), 2);
assert_eq!(count.load(Ordering::SeqCst), 2);
});
}
#[test]
fn concurrent_callbacks_modifying_same_doc() {
loom::model(|| {
let doc = LoroDoc::new();
let text_id = "shared_text";
let pair =
std::sync::Arc::new((loom::sync::Mutex::new(false), loom::sync::Condvar::new()));
let pair_clone1 = pair.clone();
let pair_clone2 = pair.clone();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let doc1 = doc.clone();
let _sub1 = doc.subscribe_root(std::sync::Arc::new(move |_| {
let count = counter_clone.fetch_add(1, Ordering::SeqCst);
if count < 2 {
doc1.get_text(text_id).insert(0, "A").unwrap();
doc1.commit();
let (lock, cvar) = &*pair_clone1;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
}
}));
let doc2 = doc.clone();
let doc3 = doc.clone();
let h = loom::thread::spawn(move || {
let (lock, cvar) = &*pair_clone2;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
drop(started);
doc2.get_text(text_id).insert(0, "B").unwrap();
doc2.commit();
doc2.commit();
});
doc3.get_text(text_id).insert(0, "Start").unwrap();
doc3.commit();
h.join().unwrap();
});
}
#[test]
fn concurrent_document_checkout_with_modifications() {
let mut builder = loom::model::Builder::new();
builder.max_branches = 3000;
builder.check(|| {
let doc = LoroDoc::new();
doc.set_detached_editing(true);
doc.get_text("text").insert(0, "Initial state").unwrap();
doc.commit();
let initial_frontier = doc.state_frontiers();
doc.get_text("text")
.insert(doc.get_text("text").len_utf8(), " - First update")
.unwrap();
doc.commit();
let second_frontier = doc.state_frontiers();
doc.get_text("text")
.insert(doc.get_text("text").len_utf8(), " - Second update")
.unwrap();
doc.commit();
let third_frontier = doc.state_frontiers();
let doc1 = doc.clone();
let doc2 = doc.clone();
let initial_clone = initial_frontier.clone();
let second_clone = second_frontier.clone();
let third_clone = third_frontier.clone();
let h1 = loom::thread::spawn(move || {
doc1.checkout(&initial_clone).unwrap();
doc1.get_text("text")
.insert(doc1.get_text("text").len_utf8(), " - Thread 1 modification")
.unwrap();
doc1.commit();
yield_now();
doc1.checkout(&third_clone).unwrap();
doc1.get_text("text")
.insert(0, " - Thread 1 after checkout")
.unwrap();
doc1.commit();
});
let h2 = loom::thread::spawn(move || {
yield_now();
doc2.checkout(&second_clone).unwrap();
doc2.get_text("text")
.insert(doc2.get_text("text").len_utf8(), " - Thread 2 modification")
.unwrap();
doc2.commit();
yield_now();
doc2.checkout_to_latest();
doc2.get_text("text")
.insert(0, " - Thread 2 after checkout")
.unwrap();
doc2.commit();
});
h1.join().unwrap();
h2.join().unwrap();
});
}
#[test]
fn concurrently_import_export() {
let mut builder = loom::model::Builder::new();
builder.max_branches = 2000;
builder.check(|| {
let doc1 = LoroDoc::new();
let doc1_clone = doc1.clone();
let doc1_clone2 = doc1.clone();
let doc2 = LoroDoc::new();
let doc2_clone = doc2.clone();
let doc2_clone2 = doc2.clone();
let mut handlers = vec![];
handlers.push(loom::thread::spawn(move || {
doc1_clone.get_text("text").insert(0, "1").unwrap();
doc1_clone.commit();
doc1_clone.get_text("text").insert(0, "1").unwrap();
doc1_clone.commit();
}));
handlers.push(loom::thread::spawn(move || {
doc2_clone.get_text("text").insert(0, "2").unwrap();
doc2_clone.commit();
}));
handlers.push(loom::thread::spawn(move || {
let e = &doc1_clone2
.export(ExportMode::updates(&doc2_clone2.oplog_vv()))
.unwrap();
doc2_clone2.import(e).unwrap();
yield_now();
let e = &doc2_clone2
.export(ExportMode::updates(&doc1_clone2.oplog_vv()))
.unwrap();
doc1_clone2.import(e).unwrap();
}));
for h in handlers {
h.join().unwrap();
}
});
}
#[test]
fn local_edits_during_batch_import() {
let mut builder = loom::model::Builder::new();
builder.max_branches = 3000;
builder.check(move || {
let doc = LoroDoc::new();
doc.get_text("text").insert(0, "hello").unwrap();
let update_a = doc.export(ExportMode::all_updates()).unwrap();
doc.get_text("text")
.update("yo! hello", UpdateOptions::default())
.unwrap();
let update_b = doc.export(ExportMode::all_updates()).unwrap();
let mut handlers = vec![];
let doc = LoroDoc::new();
let doc_clone = doc.clone();
handlers.push(loom::thread::spawn(move || {
doc_clone.get_text("text").insert(0, "1").unwrap();
doc_clone.commit();
doc_clone.get_text("text").insert(0, "1").unwrap();
doc_clone.commit();
}));
handlers.push(loom::thread::spawn(move || {
doc.import_batch(&[update_a, update_b]).unwrap();
}));
for h in handlers {
h.join().unwrap();
}
});
}
}