use std::collections::HashMap;
use crate::cf::{TableMutation, TableMutations};
use crate::doc::CursorValue;
use crate::err::Error;
use crate::kvs::{Key, KeyEncode};
use crate::sql::statements::DefineTableStatement;
use crate::sql::thing::Thing;
use crate::sql::Idiom;
type PreparedWrite = (Vec<u8>, Vec<u8>, Vec<u8>, crate::kvs::Val);
#[non_exhaustive]
pub struct Writer {
buf: Buffer,
}
#[non_exhaustive]
pub struct Buffer {
pub b: HashMap<ChangeKey, TableMutations>,
}
#[derive(Hash, Eq, PartialEq, Debug)]
#[non_exhaustive]
pub struct ChangeKey {
pub ns: String,
pub db: String,
pub tb: String,
}
impl Buffer {
pub fn new() -> Self {
Self {
b: HashMap::new(),
}
}
pub fn push(&mut self, ns: String, db: String, tb: String, m: TableMutation) {
let tb2 = tb.clone();
let ms = self
.b
.entry(ChangeKey {
ns,
db,
tb,
})
.or_insert(TableMutations::new(tb2));
ms.1.push(m);
}
}
impl Writer {
pub(crate) fn new() -> Self {
Self {
buf: Buffer::new(),
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn record_cf_change(
&mut self,
ns: &str,
db: &str,
tb: &str,
id: Thing,
previous: CursorValue,
current: CursorValue,
store_difference: bool,
) {
if current.as_ref().is_some() {
self.buf.push(
ns.to_string(),
db.to_string(),
tb.to_string(),
match store_difference {
true => {
if previous.as_ref().is_none() {
TableMutation::Set(id, current.into_owned())
} else {
let patches_to_create_previous =
current.diff(&previous, Idiom::default());
TableMutation::SetWithDiff(
id,
current.into_owned(),
patches_to_create_previous,
)
}
}
false => TableMutation::Set(id, current.into_owned()),
},
);
} else {
self.buf.push(
ns.to_string(),
db.to_string(),
tb.to_string(),
match store_difference {
true => TableMutation::DelWithOriginal(id, previous.into_owned()),
false => TableMutation::Del(id),
},
);
}
}
pub(crate) fn define_table(&mut self, ns: &str, db: &str, tb: &str, dt: &DefineTableStatement) {
self.buf.push(
ns.to_string(),
db.to_string(),
tb.to_string(),
TableMutation::Def(dt.to_owned()),
)
}
pub(crate) fn get(&self) -> Result<Vec<PreparedWrite>, Error> {
let mut r = Vec::<(Vec<u8>, Vec<u8>, Vec<u8>, crate::kvs::Val)>::new();
for (
ChangeKey {
ns,
db,
tb,
},
mutations,
) in self.buf.b.iter()
{
let ts_key: Key = crate::key::database::vs::new(ns, db).encode()?;
let tc_key_prefix: Key = crate::key::change::versionstamped_key_prefix(ns, db)?;
let tc_key_suffix: Key = crate::key::change::versionstamped_key_suffix(tb.as_str());
r.push((ts_key, tc_key_prefix, tc_key_suffix, mutations.into()))
}
Ok(r)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::cf::{ChangeSet, DatabaseMutation, TableMutation, TableMutations};
use crate::dbs::Session;
use crate::fflags::FFLAGS;
use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType::*};
use crate::sql::changefeed::ChangeFeed;
use crate::sql::id::Id;
use crate::sql::statements::show::ShowSince;
use crate::sql::statements::{
DefineDatabaseStatement, DefineNamespaceStatement, DefineTableStatement,
};
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use crate::sql::{Datetime, Idiom, Number, Object, Operation, Strand};
use crate::vs::VersionStamp;
const DONT_STORE_PREVIOUS: bool = false;
const NS: &str = "myns";
const DB: &str = "mydb";
const TB: &str = "mytb";
#[tokio::test]
async fn changefeed_read_write() {
let ts = Datetime::default();
let ds = init(false).await;
ds.changefeed_process_at(ts.0.timestamp().try_into().unwrap()).await.unwrap();
let mut tx1 = ds.transaction(Write, Optimistic).await.unwrap().inner();
let thing_a = Thing {
tb: TB.to_owned(),
id: Id::from("A"),
};
let value_a: Value = "a".into();
let previous = Value::None;
tx1.record_change(
NS,
DB,
TB,
&thing_a,
previous.clone().into(),
value_a.into(),
DONT_STORE_PREVIOUS,
);
tx1.complete_changes(true).await.unwrap();
tx1.commit().await.unwrap();
let mut tx2 = ds.transaction(Write, Optimistic).await.unwrap().inner();
let thing_c = Thing {
tb: TB.to_owned(),
id: Id::from("C"),
};
let value_c: Value = "c".into();
tx2.record_change(
NS,
DB,
TB,
&thing_c,
previous.clone().into(),
value_c.into(),
DONT_STORE_PREVIOUS,
);
tx2.complete_changes(true).await.unwrap();
tx2.commit().await.unwrap();
let mut tx3 = ds.transaction(Write, Optimistic).await.unwrap().inner();
let thing_b = Thing {
tb: TB.to_owned(),
id: Id::from("B"),
};
let value_b: Value = "b".into();
tx3.record_change(
NS,
DB,
TB,
&thing_b,
previous.clone().into(),
value_b.into(),
DONT_STORE_PREVIOUS,
);
let thing_c2 = Thing {
tb: TB.to_owned(),
id: Id::from("C"),
};
let value_c2: Value = "c2".into();
tx3.record_change(
NS,
DB,
TB,
&thing_c2,
previous.clone().into(),
value_c2.into(),
DONT_STORE_PREVIOUS,
);
tx3.complete_changes(true).await.unwrap();
tx3.commit().await.unwrap();
let start: u64 = 0;
let tx4 = ds.transaction(Write, Optimistic).await.unwrap();
let r = crate::cf::read(&tx4, NS, DB, Some(TB), ShowSince::Versionstamp(start), Some(10))
.await
.unwrap();
tx4.commit().await.unwrap();
let want: Vec<ChangeSet> = vec![
ChangeSet(
VersionStamp::from_u64(2),
DatabaseMutation(vec![TableMutations(
TB.to_string(),
match FFLAGS.change_feed_live_queries.enabled() {
true => vec![TableMutation::SetWithDiff(
Thing::from((TB.to_string(), "A".to_string())),
Value::None,
vec![],
)],
false => vec![TableMutation::Set(
Thing::from((TB.to_string(), "A".to_string())),
Value::from("a"),
)],
},
)]),
),
ChangeSet(
VersionStamp::from_u64(3),
DatabaseMutation(vec![TableMutations(
TB.to_string(),
match FFLAGS.change_feed_live_queries.enabled() {
true => vec![TableMutation::SetWithDiff(
Thing::from((TB.to_string(), "C".to_string())),
Value::None,
vec![],
)],
false => vec![TableMutation::Set(
Thing::from((TB.to_string(), "C".to_string())),
Value::from("c"),
)],
},
)]),
),
ChangeSet(
VersionStamp::from_u64(4),
DatabaseMutation(vec![TableMutations(
TB.to_string(),
match FFLAGS.change_feed_live_queries.enabled() {
true => vec![
TableMutation::SetWithDiff(
Thing::from((TB.to_string(), "B".to_string())),
Value::None,
vec![],
),
TableMutation::SetWithDiff(
Thing::from((TB.to_string(), "C".to_string())),
Value::None,
vec![],
),
],
false => vec![
TableMutation::Set(
Thing::from((TB.to_string(), "B".to_string())),
Value::from("b"),
),
TableMutation::Set(
Thing::from((TB.to_string(), "C".to_string())),
Value::from("c2"),
),
],
},
)]),
),
];
assert_eq!(r, want);
let tx5 = ds.transaction(Write, Optimistic).await.unwrap();
crate::cf::gc_range(&tx5, NS, DB, VersionStamp::from_u64(4)).await.unwrap();
tx5.commit().await.unwrap();
let tx6 = ds.transaction(Write, Optimistic).await.unwrap();
let r = crate::cf::read(&tx6, NS, DB, Some(TB), ShowSince::Versionstamp(start), Some(10))
.await
.unwrap();
tx6.commit().await.unwrap();
let want: Vec<ChangeSet> = vec![ChangeSet(
VersionStamp::from_u64(4),
DatabaseMutation(vec![TableMutations(
TB.to_string(),
match FFLAGS.change_feed_live_queries.enabled() {
true => vec![
TableMutation::SetWithDiff(
Thing::from((TB.to_string(), "B".to_string())),
Value::None,
vec![],
),
TableMutation::SetWithDiff(
Thing::from((TB.to_string(), "C".to_string())),
Value::None,
vec![],
),
],
false => vec![
TableMutation::Set(
Thing::from((TB.to_string(), "B".to_string())),
Value::from("b"),
),
TableMutation::Set(
Thing::from((TB.to_string(), "C".to_string())),
Value::from("c2"),
),
],
},
)]),
)];
assert_eq!(r, want);
ds.changefeed_process_at((ts.0.timestamp() + 5).try_into().unwrap()).await.unwrap();
let tx7 = ds.transaction(Write, Optimistic).await.unwrap();
let r = crate::cf::read(&tx7, NS, DB, Some(TB), ShowSince::Timestamp(ts), Some(10))
.await
.unwrap();
tx7.commit().await.unwrap();
assert_eq!(r, want);
}
#[test_log::test(tokio::test)]
async fn scan_picks_up_from_offset() {
let ds = init(false).await;
ds.changefeed_process_at(5).await.unwrap();
let _id1 = record_change_feed_entry(
ds.transaction(Write, Optimistic).await.unwrap(),
"First".to_string(),
)
.await;
ds.changefeed_process_at(10).await.unwrap();
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
let vs1 = tx.get_versionstamp_from_timestamp(5, NS, DB).await.unwrap().unwrap();
let vs2 = tx.get_versionstamp_from_timestamp(10, NS, DB).await.unwrap().unwrap();
tx.cancel().await.unwrap();
let _id2 = record_change_feed_entry(
ds.transaction(Write, Optimistic).await.unwrap(),
"Second".to_string(),
)
.await;
let r = change_feed_vs(ds.transaction(Write, Optimistic).await.unwrap(), &vs2).await;
assert_eq!(r.len(), 1);
assert!(r[0].0 >= vs2, "{:?}", r);
let r = change_feed_vs(ds.transaction(Write, Optimistic).await.unwrap(), &vs1).await;
assert_eq!(r.len(), 2);
}
#[test_log::test(tokio::test)]
async fn set_with_diff_records_diff_to_achieve_original() {
if !FFLAGS.change_feed_live_queries.enabled() {
return;
}
let ts = Datetime::default();
let ds = init(true).await;
ds.changefeed_process_at(ts.0.timestamp().try_into().unwrap()).await.unwrap();
let thing = Thing {
tb: TB.to_owned(),
id: Id::from("A"),
};
let ses = Session::owner().with_ns(NS).with_db(DB);
let res =
ds.execute(format!("CREATE {thing} SET value=50").as_str(), &ses, None).await.unwrap();
assert_eq!(res.len(), 1, "{:?}", res);
let res = res.into_iter().next().unwrap();
res.result.unwrap();
ds.changefeed_process_at((ts.0.timestamp() + 10).try_into().unwrap()).await.unwrap();
let res = ds
.execute(
format!("UPDATE {thing} SET value=100, new_field=\"new_value\"").as_str(),
&ses,
None,
)
.await
.unwrap();
assert_eq!(res.len(), 1, "{:?}", res);
let res = res.into_iter().next().unwrap();
res.result.unwrap();
let tx = ds.transaction(Write, Optimistic).await.unwrap();
let r = change_feed_ts(tx, &ts).await;
let expected_obj_first = Value::Object(Object::from(map! {
"id".to_string() => Value::Thing(thing.clone()),
"value".to_string() => Value::Number(Number::Int(50)),
}));
let expected_obj_second = Value::Object(Object::from(map! {
"id".to_string() => Value::Thing(thing.clone()),
"value".to_string() => Value::Number(Number::Int(100)),
"new_field".to_string() => Value::Strand(Strand::from("new_value")),
}));
assert_eq!(r.len(), 2, "{:?}", r);
let expected: Vec<ChangeSet> = vec![
ChangeSet(
VersionStamp::from_u64(2),
DatabaseMutation(vec![TableMutations(
TB.to_string(),
vec![TableMutation::Set(
Thing::from((TB.to_string(), "A".to_string())),
expected_obj_first,
)],
)]),
),
ChangeSet(
VersionStamp::from_u64(4),
DatabaseMutation(vec![TableMutations(
TB.to_string(),
vec![TableMutation::SetWithDiff(
Thing::from((TB.to_string(), "A".to_string())),
expected_obj_second,
vec![
Operation::Remove {
path: Idiom::from("new_field"),
},
Operation::Replace {
path: Idiom::from("value"),
value: Value::Number(Number::Int(50)),
},
],
)],
)]),
),
];
assert_eq!(r, expected);
}
async fn change_feed_ts(tx: Transaction, ts: &Datetime) -> Vec<ChangeSet> {
let r = crate::cf::read(&tx, NS, DB, Some(TB), ShowSince::Timestamp(ts.clone()), Some(10))
.await
.unwrap();
tx.cancel().await.unwrap();
r
}
async fn change_feed_vs(tx: Transaction, vs: &VersionStamp) -> Vec<ChangeSet> {
let r = crate::cf::read(
&tx,
NS,
DB,
Some(TB),
ShowSince::Versionstamp(vs.into_u64_lossy()),
Some(10),
)
.await
.unwrap();
tx.cancel().await.unwrap();
r
}
async fn record_change_feed_entry(tx: Transaction, id: String) -> Thing {
let thing = Thing {
tb: TB.to_owned(),
id: Id::from(id),
};
let value_a: Value = "a".into();
let previous = Value::None.into();
tx.lock().await.record_change(
NS,
DB,
TB,
&thing,
previous,
value_a.into(),
DONT_STORE_PREVIOUS,
);
tx.lock().await.complete_changes(true).await.unwrap();
tx.commit().await.unwrap();
thing
}
async fn init(store_diff: bool) -> Datastore {
let dns = DefineNamespaceStatement {
name: crate::sql::Ident(NS.to_string()),
..Default::default()
};
let ddb = DefineDatabaseStatement {
name: crate::sql::Ident(DB.to_string()),
changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10),
store_diff,
}),
..Default::default()
};
let dtb = DefineTableStatement {
name: TB.into(),
changefeed: Some(ChangeFeed {
expiry: Duration::from_secs(10 * 60),
store_diff,
}),
..Default::default()
};
let ds = Datastore::new("memory").await.unwrap();
let mut tx = ds.transaction(Write, Optimistic).await.unwrap().inner();
let ns_root = crate::key::root::ns::new(NS);
tx.put(&ns_root, dns, None).await.unwrap();
let db_root = crate::key::namespace::db::new(NS, DB);
tx.put(&db_root, ddb, None).await.unwrap();
let tb_root = crate::key::database::tb::new(NS, DB, TB);
tx.put(&tb_root, dtb.clone(), None).await.unwrap();
tx.commit().await.unwrap();
ds
}
}