Skip to main content

WriteBatch

Struct WriteBatch 

Source
pub struct WriteBatch { /* private fields */ }
Expand description

Atomic group of writes that may span multiple buckets.

Methods without a bucket suffix target the built-in default bucket. Methods ending in _bucket target an optional named bucket returned by Db::bucket.

A batch is committed with crate::Db::write_sync or crate::Db::write. Trine assigns one commit sequence to the entire batch, appends the accepted operations to the WAL for persistent databases, and publishes the batch to the affected memtables atomically from the caller’s point of view.

§Examples

use trine_kv::{Db, WriteBatch, WriteOptions};

let db = Db::open_sync(trine_kv::DbOptions::memory())?;
let users = db.bucket_sync("users")?;

let mut batch = WriteBatch::new();
batch.put(b"system:ready", b"yes");
batch.put_bucket(users.name().as_str(), b"1", b"Ada")?;

let commit = db.write_sync(batch, WriteOptions::sync_all())?;
assert!(commit.sequence().get() > 0);

Implementations§

Source§

impl WriteBatch

Source

pub const fn new() -> Self

Creates an empty batch.

The batch does not reserve a commit sequence until it is passed to a database write method.

Examples found in repository?
examples/event_index.rs (line 58)
57    fn append(&self, event: &Event) -> Result<()> {
58        let mut batch = WriteBatch::new();
59        batch.put_bucket("events", event_key(&event.id), event.encode()?)?;
60        batch.put_bucket(
61            "events_by_account",
62            account_event_key(&event.account_id, &event.id),
63            event.id.as_bytes(),
64        )?;
65        self.db.write_sync(batch, WriteOptions::default())?;
66        Ok(())
67    }
More examples
Hide additional examples
examples/sync_quickstart.rs (line 15)
3fn main() -> trine_kv::Result<()> {
4    let path =
5        std::env::temp_dir().join(format!("trine-kv-sync-quickstart-{}", std::process::id()));
6    if path.exists() {
7        std::fs::remove_dir_all(&path)?;
8    }
9
10    let db = Db::open_sync(&path)?;
11    let users = db.bucket_sync("users")?;
12
13    users.put_sync(b"user:001", b"Ada")?;
14
15    let mut batch = WriteBatch::new();
16    batch.put_bucket("users", b"user:002", b"Lin")?;
17    batch.put_bucket("users", b"team:core", b"database")?;
18    db.write_sync(batch, WriteOptions::default())?;
19
20    assert_eq!(users.get_sync(b"user:001")?, Some(b"Ada".to_vec()));
21
22    let snapshot = db.snapshot();
23    users.put_sync(b"user:003", b"Grace")?;
24    assert_eq!(snapshot.get_sync(&users, b"user:003")?, None);
25    assert_eq!(users.get_sync(b"user:003")?, Some(b"Grace".to_vec()));
26
27    let user_prefix_keys = users
28        .prefix_sync(b"user:")?
29        .map(|item| item.map(|key_value| display_key(&key_value.key)))
30        .collect::<trine_kv::Result<Vec<_>>>()?;
31    assert_eq!(user_prefix_keys, ["user:001", "user:002", "user:003"]);
32
33    let range = KeyRange::half_open(b"user:001", b"user:004");
34    let range_values = users
35        .range_sync(&range)?
36        .map(|item| item.map(|key_value| display_value(&key_value.value)))
37        .collect::<trine_kv::Result<Vec<_>>>()?;
38    assert_eq!(range_values, ["Ada", "Lin", "Grace"]);
39
40    let mut txn = db.transaction(TransactionOptions::default());
41    assert_eq!(
42        txn.get_bucket_sync("users", b"user:001")?,
43        Some(b"Ada".to_vec())
44    );
45    txn.put_bucket("users", b"user:004", b"Barbara")?;
46    txn.commit_sync()?;
47
48    db.flush_sync()?;
49    drop(users);
50    drop(snapshot);
51    drop(db);
52
53    let reopened = Db::open_sync(&path)?;
54    let users = reopened.bucket_sync("users")?;
55    assert_eq!(users.get_sync(b"user:004")?, Some(b"Barbara".to_vec()));
56
57    let stats = reopened.stats();
58    assert_eq!(stats.live_buckets, 2);
59    assert!(stats.total_tables > 0);
60
61    drop(users);
62    drop(reopened);
63    std::fs::remove_dir_all(path)?;
64    Ok(())
65}
examples/quickstart.rs (line 31)
22async fn run(path: &Path) -> Result<()> {
23    let mut options = DbOptions::new(path);
24    options.background_worker_count = 0;
25
26    let db = Db::open(options).await?;
27    let users = db.bucket("users").await?;
28
29    users.put(b"user:001".to_vec(), b"Ada".to_vec()).await?;
30
31    let mut batch = WriteBatch::new();
32    batch.put_bucket("users", b"user:002".to_vec(), b"Lin".to_vec())?;
33    batch.put_bucket("users", b"team:core".to_vec(), b"database".to_vec())?;
34    db.write(batch, WriteOptions::default()).await?;
35
36    assert_eq!(users.get(b"user:001").await?, Some(b"Ada".to_vec()));
37
38    let snapshot = db.snapshot();
39    users.put(b"user:003".to_vec(), b"Grace".to_vec()).await?;
40    assert_eq!(
41        users.get_at(&snapshot, b"user:003").await?,
42        None,
43        "snapshot reads stay pinned to their original sequence",
44    );
45    assert_eq!(users.get(b"user:003").await?, Some(b"Grace".to_vec()));
46
47    let prefix_rows = collect_lazy_rows(users.prefix_lazy(b"user:".to_vec()).await?).await?;
48    assert_eq!(
49        prefix_rows,
50        [
51            ("user:001".to_owned(), "Ada".to_owned()),
52            ("user:002".to_owned(), "Lin".to_owned()),
53            ("user:003".to_owned(), "Grace".to_owned()),
54        ],
55    );
56
57    let mut transaction = db.transaction(TransactionOptions::default());
58    assert_eq!(
59        transaction.get_bucket("users", b"user:001").await?,
60        Some(b"Ada".to_vec()),
61    );
62    transaction
63        .read_range_bucket("users", KeyRange::half_open(b"user:001", b"user:004"))
64        .await?;
65    transaction.put_bucket("users", b"user:004".to_vec(), b"Barbara".to_vec())?;
66    transaction.commit().await?;
67
68    db.flush().await?;
69    drop(users);
70    drop(snapshot);
71    db.close().await?;
72
73    let reopened = Db::open(DbOptions::new(path).read_only()).await?;
74    let users = reopened.bucket("users").await?;
75    assert_eq!(users.get(b"user:004").await?, Some(b"Barbara".to_vec()));
76
77    let stats = reopened.stats();
78    assert_eq!(stats.live_buckets, 2);
79    assert!(stats.total_tables > 0);
80    assert!(stats.storage_uses_sync_adapter);
81    assert!(!stats.storage_uses_platform_async_io);
82
83    drop(users);
84    reopened.close().await
85}
Source

pub fn put(&mut self, key: impl Into<Vec<u8>>, value: impl Into<Value>)

Adds a key/value write to the default bucket.

§Parameters
  • key: user key bytes for the built-in default bucket.
  • value: value bytes to store.
Source

pub fn put_bucket( &mut self, bucket: impl Into<String>, key: impl Into<Vec<u8>>, value: impl Into<Value>, ) -> Result<()>

Adds a key/value write for a named bucket.

The bucket name must refer to an optional named bucket. Use WriteBatch::put for the built-in default bucket.

§Parameters
  • bucket: target named bucket.
  • key: user key bytes.
  • value: value bytes to store.
§Errors

Returns Error::InvalidOptions if bucket is empty or is the reserved default bucket name.

Examples found in repository?
examples/event_index.rs (line 59)
57    fn append(&self, event: &Event) -> Result<()> {
58        let mut batch = WriteBatch::new();
59        batch.put_bucket("events", event_key(&event.id), event.encode()?)?;
60        batch.put_bucket(
61            "events_by_account",
62            account_event_key(&event.account_id, &event.id),
63            event.id.as_bytes(),
64        )?;
65        self.db.write_sync(batch, WriteOptions::default())?;
66        Ok(())
67    }
More examples
Hide additional examples
examples/sync_quickstart.rs (line 16)
3fn main() -> trine_kv::Result<()> {
4    let path =
5        std::env::temp_dir().join(format!("trine-kv-sync-quickstart-{}", std::process::id()));
6    if path.exists() {
7        std::fs::remove_dir_all(&path)?;
8    }
9
10    let db = Db::open_sync(&path)?;
11    let users = db.bucket_sync("users")?;
12
13    users.put_sync(b"user:001", b"Ada")?;
14
15    let mut batch = WriteBatch::new();
16    batch.put_bucket("users", b"user:002", b"Lin")?;
17    batch.put_bucket("users", b"team:core", b"database")?;
18    db.write_sync(batch, WriteOptions::default())?;
19
20    assert_eq!(users.get_sync(b"user:001")?, Some(b"Ada".to_vec()));
21
22    let snapshot = db.snapshot();
23    users.put_sync(b"user:003", b"Grace")?;
24    assert_eq!(snapshot.get_sync(&users, b"user:003")?, None);
25    assert_eq!(users.get_sync(b"user:003")?, Some(b"Grace".to_vec()));
26
27    let user_prefix_keys = users
28        .prefix_sync(b"user:")?
29        .map(|item| item.map(|key_value| display_key(&key_value.key)))
30        .collect::<trine_kv::Result<Vec<_>>>()?;
31    assert_eq!(user_prefix_keys, ["user:001", "user:002", "user:003"]);
32
33    let range = KeyRange::half_open(b"user:001", b"user:004");
34    let range_values = users
35        .range_sync(&range)?
36        .map(|item| item.map(|key_value| display_value(&key_value.value)))
37        .collect::<trine_kv::Result<Vec<_>>>()?;
38    assert_eq!(range_values, ["Ada", "Lin", "Grace"]);
39
40    let mut txn = db.transaction(TransactionOptions::default());
41    assert_eq!(
42        txn.get_bucket_sync("users", b"user:001")?,
43        Some(b"Ada".to_vec())
44    );
45    txn.put_bucket("users", b"user:004", b"Barbara")?;
46    txn.commit_sync()?;
47
48    db.flush_sync()?;
49    drop(users);
50    drop(snapshot);
51    drop(db);
52
53    let reopened = Db::open_sync(&path)?;
54    let users = reopened.bucket_sync("users")?;
55    assert_eq!(users.get_sync(b"user:004")?, Some(b"Barbara".to_vec()));
56
57    let stats = reopened.stats();
58    assert_eq!(stats.live_buckets, 2);
59    assert!(stats.total_tables > 0);
60
61    drop(users);
62    drop(reopened);
63    std::fs::remove_dir_all(path)?;
64    Ok(())
65}
examples/quickstart.rs (line 32)
22async fn run(path: &Path) -> Result<()> {
23    let mut options = DbOptions::new(path);
24    options.background_worker_count = 0;
25
26    let db = Db::open(options).await?;
27    let users = db.bucket("users").await?;
28
29    users.put(b"user:001".to_vec(), b"Ada".to_vec()).await?;
30
31    let mut batch = WriteBatch::new();
32    batch.put_bucket("users", b"user:002".to_vec(), b"Lin".to_vec())?;
33    batch.put_bucket("users", b"team:core".to_vec(), b"database".to_vec())?;
34    db.write(batch, WriteOptions::default()).await?;
35
36    assert_eq!(users.get(b"user:001").await?, Some(b"Ada".to_vec()));
37
38    let snapshot = db.snapshot();
39    users.put(b"user:003".to_vec(), b"Grace".to_vec()).await?;
40    assert_eq!(
41        users.get_at(&snapshot, b"user:003").await?,
42        None,
43        "snapshot reads stay pinned to their original sequence",
44    );
45    assert_eq!(users.get(b"user:003").await?, Some(b"Grace".to_vec()));
46
47    let prefix_rows = collect_lazy_rows(users.prefix_lazy(b"user:".to_vec()).await?).await?;
48    assert_eq!(
49        prefix_rows,
50        [
51            ("user:001".to_owned(), "Ada".to_owned()),
52            ("user:002".to_owned(), "Lin".to_owned()),
53            ("user:003".to_owned(), "Grace".to_owned()),
54        ],
55    );
56
57    let mut transaction = db.transaction(TransactionOptions::default());
58    assert_eq!(
59        transaction.get_bucket("users", b"user:001").await?,
60        Some(b"Ada".to_vec()),
61    );
62    transaction
63        .read_range_bucket("users", KeyRange::half_open(b"user:001", b"user:004"))
64        .await?;
65    transaction.put_bucket("users", b"user:004".to_vec(), b"Barbara".to_vec())?;
66    transaction.commit().await?;
67
68    db.flush().await?;
69    drop(users);
70    drop(snapshot);
71    db.close().await?;
72
73    let reopened = Db::open(DbOptions::new(path).read_only()).await?;
74    let users = reopened.bucket("users").await?;
75    assert_eq!(users.get(b"user:004").await?, Some(b"Barbara".to_vec()));
76
77    let stats = reopened.stats();
78    assert_eq!(stats.live_buckets, 2);
79    assert!(stats.total_tables > 0);
80    assert!(stats.storage_uses_sync_adapter);
81    assert!(!stats.storage_uses_platform_async_io);
82
83    drop(users);
84    reopened.close().await
85}
Source

pub fn delete(&mut self, key: impl Into<Vec<u8>>)

Adds a point delete to the default bucket.

The delete hides older values for the same key after the batch commits. Snapshots older than the commit sequence can still see earlier values.

Source

pub fn delete_bucket( &mut self, bucket: impl Into<String>, key: impl Into<Vec<u8>>, ) -> Result<()>

Adds a point delete for a named bucket.

Source

pub fn delete_range(&mut self, range: KeyRange)

Adds a range delete to the default bucket.

The delete hides all keys in range for read sequences after the batch commits. The operation is stored as a range tombstone and can conflict with optimistic transactions that read overlapping keys or ranges.

Source

pub fn delete_range_bucket( &mut self, bucket: impl Into<String>, range: KeyRange, ) -> Result<()>

Adds a range delete for a named bucket.

Source

pub fn operations(&self) -> &[BatchOperation]

Returns the operations in insertion order.

Source

pub fn into_operations(self) -> Vec<BatchOperation>

Consumes the batch and returns its operations in insertion order.

Source

pub fn len(&self) -> usize

Returns the number of operations in the batch.

Source

pub fn is_empty(&self) -> bool

Returns true when the batch contains no operations.

Trait Implementations§

Source§

impl Clone for WriteBatch

Source§

fn clone(&self) -> WriteBatch

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for WriteBatch

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for WriteBatch

Source§

fn default() -> WriteBatch

Returns the “default value” for a type. Read more
Source§

impl Eq for WriteBatch

Source§

impl PartialEq for WriteBatch

Source§

fn eq(&self, other: &WriteBatch) -> bool

Tests for self and other values to be equal, and is used by ==.
1.0.0 (const: unstable) · Source§

fn ne(&self, other: &Rhs) -> bool

Tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
Source§

impl StructuralPartialEq for WriteBatch

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.