#![allow(dead_code)]
use crate::types::{EditorTextDelta, RevisionedEditorTextDelta, RevisionedTextDelta, TextDelta};
use operational_transform::OperationSeq;
use tracing::{debug, warn};
#[derive(Debug, Default)]
#[must_use]
pub struct OTServer {
editor_revision: usize,
daemon_revision: usize,
operations: Vec<OperationSeq>,
editor_queue: Vec<OperationSeq>,
current_content: String,
last_confirmed_editor_content: String,
}
impl OTServer {
pub fn new(initial_content: String) -> Self {
Self {
last_confirmed_editor_content: initial_content.clone(),
current_content: initial_content,
..Default::default()
}
}
pub fn apply_crdt_change(&mut self, delta: &TextDelta) -> RevisionedEditorTextDelta {
self.operations.push(delta.clone().into());
self.editor_queue.push(delta.clone().into());
self.daemon_revision += 1;
let editor_delta = EditorTextDelta::from_delta(delta.clone(), &self.current_content);
self.current_content = Self::force_apply(&self.current_content, delta.clone().into());
RevisionedEditorTextDelta {
revision: self.editor_revision,
delta: editor_delta,
}
}
pub fn apply_editor_operation(
&mut self,
rev_editor_delta: RevisionedEditorTextDelta,
) -> (TextDelta, Vec<RevisionedEditorTextDelta>) {
let daemon_revision = rev_editor_delta.revision;
self.editor_revision += 1;
let mut op_seq: OperationSeq;
assert!(
daemon_revision <= self.daemon_revision,
"This must not happen, editor has seen a daemon revision from the future."
);
let daemon_operations_to_transform = self.daemon_revision - daemon_revision;
assert!(
self.editor_queue.len() >= daemon_operations_to_transform,
"Whoopsie, we don't have enough operations cached. Was this already processed?"
);
let seen_operations = self.editor_queue.len() - daemon_operations_to_transform;
debug!(
"Editor is confirming {} operations, there are still {} unconfirmed operations.",
seen_operations, daemon_operations_to_transform,
);
if daemon_operations_to_transform > 50 {
warn!(
"Editor is {} operations behind, it might have trouble catching up?",
daemon_operations_to_transform
);
}
let confirmed_queue = self.editor_queue.drain(..seen_operations);
for confirmed_editor_op in confirmed_queue {
debug!(
"Applying confirmed operation {:#?} to last confirmed editor content {:?}",
&confirmed_editor_op, &self.last_confirmed_editor_content
);
self.last_confirmed_editor_content = Self::force_apply(
&self.last_confirmed_editor_content,
confirmed_editor_op.clone(),
);
}
let rev_delta = RevisionedTextDelta::from_rev_ed_delta(
rev_editor_delta,
&self.last_confirmed_editor_content,
);
op_seq = rev_delta.delta.into();
debug!(
"Applying incoming editor operation {:#?} to last confirmed editor content {:?}",
&op_seq, &self.last_confirmed_editor_content
);
self.last_confirmed_editor_content =
Self::force_apply(&self.last_confirmed_editor_content, op_seq.clone());
(op_seq, self.editor_queue) = transform_through_operations(op_seq, &self.editor_queue);
self.operations.push(op_seq.clone());
self.current_content = Self::force_apply(&self.current_content, op_seq.clone());
let deltas_for_editor = self.deltas_for_editor();
(op_seq.into(), deltas_for_editor)
}
fn deltas_for_editor(&self) -> Vec<RevisionedEditorTextDelta> {
let mut to_editor = vec![];
let mut document = self.last_confirmed_editor_content.clone();
for editor_op in &self.editor_queue {
let delta: TextDelta = editor_op.clone().into();
let ed_delta = EditorTextDelta::from_delta(delta, &document);
to_editor.push(RevisionedEditorTextDelta::new(
self.editor_revision,
ed_delta,
));
document = Self::force_apply(&document, editor_op.clone());
}
to_editor
}
#[must_use]
pub fn current_content(&self) -> &str {
&self.current_content
}
#[must_use]
fn force_apply(document: &str, mut op_seq: OperationSeq) -> String {
let doc_chars = document.chars().count();
if op_seq.base_len() < doc_chars {
op_seq.retain((doc_chars - op_seq.base_len()) as u64);
}
op_seq.apply(document).unwrap_or_else(|_| {
panic!("Could not apply operation {op_seq:?} to string with length {doc_chars} ('{document:?}')")
})
}
}
fn transform_through_operations(
mut their_op_seq: OperationSeq,
my_operations: &[OperationSeq],
) -> (OperationSeq, Vec<OperationSeq>) {
let mut transformed_my_operations = Vec::new();
for my_op_seq in my_operations {
let mut my_op_seq = my_op_seq.clone();
if my_op_seq.base_len() < their_op_seq.base_len() {
let diff = their_op_seq.base_len() - my_op_seq.base_len();
my_op_seq.retain(diff as u64);
} else {
let diff = my_op_seq.base_len() - their_op_seq.base_len();
their_op_seq.retain(diff as u64);
}
let (my_prime, their_prime) = my_op_seq.transform(&their_op_seq).unwrap_or_else(|_| {
panic!(
"Could not transform operations {:?} on top of {:?}.",
&their_op_seq.ops(),
&my_op_seq.ops()
)
});
transformed_my_operations.push(my_prime);
their_op_seq = their_prime;
}
(their_op_seq, transformed_my_operations)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::factories::*;
use tracing_test::traced_test;
fn compose(delta1: TextDelta, delta2: TextDelta) -> TextDelta {
operational_transform_internals::ot_compose(delta1.into(), delta2.into()).into()
}
mod ot_server_public_interface {
use super::*;
#[traced_test]
#[test]
fn routes_operations_through_server() {
let mut ot_server: OTServer = OTServer::new("hello".into());
let to_editor = ot_server.apply_crdt_change(&insert(1, "x"));
let expected = EditorTextDelta::from_delta(insert(1, "x"), "hello");
assert_eq!(to_editor, rev_ed_delta(0, expected));
let (to_crdt, to_editor) =
ot_server.apply_editor_operation(rev_ed_delta_single(0, (0, 2), (0, 2), "y"));
assert_eq!(to_crdt, insert(3, "y"));
let expected = EditorTextDelta::from_delta(insert(1, "x"), "heyllo");
assert_eq!(to_editor, vec![rev_ed_delta(1, expected)]);
assert_eq!(
ot_server.operations,
vec![insert(1, "x").into(), insert(3, "y").into()]
);
assert_eq!(ot_server.current_content(), "hxeyllo");
let to_editor = ot_server.apply_crdt_change(&insert(3, "z"));
let expected = EditorTextDelta::from_delta(insert(3, "z"), "TODO");
assert_eq!(to_editor, rev_ed_delta(1, expected));
assert_eq!(ot_server.current_content(), "hxezyllo");
let (to_crdt, to_editor) =
ot_server.apply_editor_operation(rev_ed_delta_single(1, (0, 1), (0, 5), ""));
assert_eq!(to_crdt, compose(delete(1, 2), delete(2, 2)));
let expected = EditorTextDelta::from_delta(insert(1, "z"), "hlo");
assert_eq!(to_editor, vec![rev_ed_delta(2, expected)]);
assert_eq!(ot_server.current_content(), "hzlo");
assert_eq!(
ot_server.operations,
vec![
insert(1, "x").into(),
insert(3, "y").into(),
insert(3, "z").into(),
compose(delete(1, 2), delete(2, 2)).into()
]
);
assert_eq!(ot_server.editor_queue, vec![insert(1, "z").into(),]);
assert_eq!(ot_server.last_confirmed_editor_content, "hlo");
let (_to_crdt, _to_editor) =
ot_server.apply_editor_operation(rev_ed_delta_single(2, (0, 4), (0, 4), "!"));
assert_eq!(ot_server.editor_queue, vec![]);
assert_eq!(ot_server.last_confirmed_editor_content, "hzlo!");
let _to_editor = ot_server.apply_crdt_change(&insert(1, "o"));
let _to_editor = ot_server.apply_crdt_change(&insert(2, "\n"));
let (_to_crdt, _to_editor) =
ot_server.apply_editor_operation(rev_ed_delta_single(2, (0, 5), (0, 5), "\nblubb"));
assert_eq!(ot_server.editor_queue.len(), 2);
assert_eq!(ot_server.last_confirmed_editor_content, "hzlo!\nblubb");
assert_eq!(ot_server.current_content(), "ho\nzlo!\nblubb");
}
#[test]
fn can_do_newline_stuff() {
let mut ot_server: OTServer = OTServer::default();
ot_server.apply_crdt_change(&insert(0, "Let's say\nthis could be\na poem."));
ot_server.apply_editor_operation(rev_ed_delta_single(
ot_server.daemon_revision,
(0, 0),
(0, 0),
"THE POEM\n",
));
assert_eq!(
ot_server.current_content(),
"THE POEM\nLet's say\nthis could be\na poem."
);
ot_server.apply_crdt_change(&replace(
"THE POEM\nLet's say\n".len(), "this could".len(), "I want to", ));
ot_server.apply_editor_operation(rev_ed_delta_single(
ot_server.daemon_revision - 1, (3, 0),
(3, "a poem".len()),
"the boss",
));
assert_eq!(
ot_server.current_content(),
"THE POEM\nLet's say\nI want to be\nthe boss."
);
}
#[test]
fn newline_join_behavior() {
let content = "hello\nworld\n";
let mut ot_server: OTServer = OTServer::new(content.to_string());
let (to_crdt_1, _) =
ot_server.apply_editor_operation(rev_ed_delta_single(0, (0, 5), (0, 5), " world"));
let (to_crdt_2, _) =
ot_server.apply_editor_operation(rev_ed_delta_single(0, (1, 0), (2, 0), ""));
assert_eq!(to_crdt_1, insert(5, " world"));
assert_eq!(to_crdt_2, delete(12, 6));
let mut ot_server2: OTServer = OTServer::new(content.to_string());
let to_2nd_editor = ot_server2.apply_crdt_change(&to_crdt_1);
let to_2nd_editor_2 = ot_server2.apply_crdt_change(&to_crdt_2);
assert_eq!(
to_2nd_editor,
rev_ed_delta_single(0, (0, 5), (0, 5), " world")
);
assert_eq!(to_2nd_editor_2, rev_ed_delta_single(0, (1, 0), (2, 0), ""));
assert_eq!(ot_server.current_content(), ot_server2.current_content());
}
}
mod ot_server_internal_state {
use super::*;
fn dummy_insert(at: usize) -> TextDelta {
insert(at, "foo")
}
#[test]
fn crdt_change_increases_revision() {
let mut ot_server: OTServer = OTServer::new("foobar".to_string());
ot_server.apply_crdt_change(&dummy_insert(2));
assert_eq!(ot_server.daemon_revision, 1);
assert_eq!(ot_server.editor_revision, 0);
}
#[test]
fn editor_operation_tracks_revision() {
let mut ot_server: OTServer = OTServer::new("foobar".to_string());
ot_server.apply_editor_operation(rev_ed_delta_single(0, (0, 0), (0, 0), "x"));
assert_eq!(ot_server.editor_revision, 1);
assert_eq!(ot_server.daemon_revision, 0);
}
#[test]
fn crdt_change_tracks_in_queue() {
let mut ot_server: OTServer = OTServer::new("foobar".to_string());
ot_server.apply_crdt_change(&dummy_insert(2));
assert_eq!(ot_server.editor_queue, vec![dummy_insert(2).into()]);
}
#[test]
fn editor_operation_reduces_editor_queue() {
let mut ot_server: OTServer = OTServer::new("xx".to_string());
ot_server.apply_crdt_change(&dummy_insert(2));
ot_server.apply_crdt_change(&dummy_insert(5));
ot_server.apply_crdt_change(&dummy_insert(8));
assert_eq!(ot_server.editor_queue.len(), 3);
ot_server.apply_editor_operation(rev_ed_delta_single(1, (0, 2), (0, 2), "foo"));
assert_eq!(ot_server.editor_queue.len(), 2);
}
#[test]
fn replace_single_character() {
let mut ot_server: OTServer = OTServer::new("hello".into());
let (to_crdt, _to_editor) =
ot_server.apply_editor_operation(rev_ed_delta_single(0, (0, 1), (0, 2), "u"));
let mut expected = TextDelta::default();
expected.retain(1);
expected.insert("u");
expected.delete(1);
assert_eq!(expected, to_crdt);
assert_eq!(ot_server.current_content(), "hullo");
assert_eq!(ot_server.last_confirmed_editor_content, "hullo");
}
}
mod operational_transform_internals {
use super::*;
use operational_transform::Operation as OTOperation;
fn ot_insert(at: usize, s: &str) -> OperationSeq {
let mut op_seq = OperationSeq::default();
op_seq.retain(at as u64);
op_seq.insert(s);
op_seq
}
fn ot_delete(from: usize, length: usize) -> OperationSeq {
let mut op_seq = OperationSeq::default();
op_seq.retain(from as u64);
op_seq.delete(length as u64);
op_seq
}
#[expect(clippy::needless_pass_by_value)] pub fn ot_compose(mut op1: OperationSeq, op2: OperationSeq) -> OperationSeq {
if op1.target_len() < op2.base_len() {
op1.retain((op2.base_len() - op1.target_len()) as u64);
}
op1.compose(&op2)
.expect("Composition failed. Lengths messed up?")
}
#[test]
fn transforms_operation_correctly() {
let mut ours = vec![ot_insert(0, "foo"), ot_insert(3, "foo")];
let theirs = ot_insert(0, "bar");
let (theirs, ours_prime) = transform_through_operations(theirs, &ours);
assert_eq!(theirs, ot_insert(6, "bar"));
ours[0].retain(3);
ours[1].retain(3);
assert_eq!(ours_prime, ours);
}
#[test]
fn transforms_operation_correctly_different_base_lengths() {
let ours = vec![ot_insert(3, "foo")];
let mut theirs = ot_insert(0, "bar");
let (theirs_prime, ours_prime) = transform_through_operations(theirs.clone(), &ours);
theirs.retain(6);
assert_eq!(theirs, theirs_prime);
assert_eq!(ours_prime, vec![ot_insert(6, "foo")]);
}
#[test]
fn transforms_operation_correctly_splits_deletion() {
let editor_op = ot_insert(2, "x");
let unacknowledged_ops = vec![ot_delete(1, 3)];
let (op_prime, queue_prime) =
transform_through_operations(editor_op, &unacknowledged_ops);
assert_eq!(op_prime, ot_insert(1, "x"));
assert_eq!(
queue_prime,
vec![ot_compose(ot_delete(1, 1), ot_delete(2, 2))]
);
}
#[test]
fn ot_transform_does_what_we_think() {
let mut a = OperationSeq::default();
let mut b = OperationSeq::default();
let mut c = OperationSeq::default();
a.retain(2);
a.insert("x");
a.retain(1);
b.retain(1);
b.delete(2);
c.retain(2);
c.insert("y");
c.retain(1);
let (a_prime, b_prime) = a
.transform(&b)
.expect("Transform failed. Do the lengths fit?");
assert_eq!(
a_prime.ops(),
vec![OTOperation::Retain(1), OTOperation::Insert("x".to_string())]
);
assert_eq!(
b_prime.ops(),
vec![
OTOperation::Retain(1),
OTOperation::Delete(1),
OTOperation::Retain(1),
OTOperation::Delete(1)
]
);
let (a_prime, c_prime) = a
.transform(&c)
.expect("Transform failed. Do the lengths fit?");
assert_eq!(
a_prime.ops(),
vec![
OTOperation::Retain(2),
OTOperation::Insert("x".to_string()),
OTOperation::Retain(2)
]
);
assert_eq!(
c_prime.ops(),
vec![
OTOperation::Retain(3),
OTOperation::Insert("y".to_string()),
OTOperation::Retain(1)
]
);
}
}
}