Skip to main content

quickstart/
quickstart.rs

1use std::{
2    future::Future,
3    path::{Path, PathBuf},
4    sync::Arc,
5    task::{Context, Poll, Wake, Waker},
6    thread,
7    time::Duration,
8};
9
10use trine_kv::{
11    Db, DbOptions, KeyRange, LazyIter, Result, TransactionOptions, WriteBatch, WriteOptions,
12};
13
14fn main() -> Result<()> {
15    let path = temp_path("trine-kv-quickstart");
16    reset_dir(&path)?;
17    block_on(run(&path))?;
18    reset_dir(&path)?;
19    Ok(())
20}
21
22async fn run(path: &Path) -> Result<()> {
23    let mut options = DbOptions::new(path);
24    options.background_worker_count = 0;
25
26    let db = Db::open(options).await?;
27    let users = db.bucket("users").await?;
28
29    users.put(b"user:001".to_vec(), b"Ada".to_vec()).await?;
30
31    let mut batch = WriteBatch::new();
32    batch.put_bucket("users", b"user:002".to_vec(), b"Lin".to_vec())?;
33    batch.put_bucket("users", b"team:core".to_vec(), b"database".to_vec())?;
34    db.write(batch, WriteOptions::default()).await?;
35
36    assert_eq!(users.get(b"user:001").await?, Some(b"Ada".to_vec()));
37
38    let snapshot = db.snapshot();
39    users.put(b"user:003".to_vec(), b"Grace".to_vec()).await?;
40    assert_eq!(
41        users.get_at(&snapshot, b"user:003").await?,
42        None,
43        "snapshot reads stay pinned to their original sequence",
44    );
45    assert_eq!(users.get(b"user:003").await?, Some(b"Grace".to_vec()));
46
47    let prefix_rows = collect_lazy_rows(users.prefix_lazy(b"user:".to_vec()).await?).await?;
48    assert_eq!(
49        prefix_rows,
50        [
51            ("user:001".to_owned(), "Ada".to_owned()),
52            ("user:002".to_owned(), "Lin".to_owned()),
53            ("user:003".to_owned(), "Grace".to_owned()),
54        ],
55    );
56
57    let mut transaction = db.transaction(TransactionOptions::default());
58    assert_eq!(
59        transaction.get_bucket("users", b"user:001").await?,
60        Some(b"Ada".to_vec()),
61    );
62    transaction
63        .read_range_bucket("users", KeyRange::half_open(b"user:001", b"user:004"))
64        .await?;
65    transaction.put_bucket("users", b"user:004".to_vec(), b"Barbara".to_vec())?;
66    transaction.commit().await?;
67
68    db.flush().await?;
69    drop(users);
70    drop(snapshot);
71    db.close().await?;
72
73    let reopened = Db::open(DbOptions::new(path).read_only()).await?;
74    let users = reopened.bucket("users").await?;
75    assert_eq!(users.get(b"user:004").await?, Some(b"Barbara".to_vec()));
76
77    let stats = reopened.stats();
78    assert_eq!(stats.live_buckets, 2);
79    assert!(stats.total_tables > 0);
80    assert!(stats.storage_uses_sync_adapter);
81    assert!(!stats.storage_uses_platform_async_io);
82
83    drop(users);
84    reopened.close().await
85}
86
87async fn collect_lazy_rows(mut iter: LazyIter) -> Result<Vec<(String, String)>> {
88    let mut rows = Vec::new();
89    while let Some(item) = iter.next().await? {
90        let value = item.value.read().await?;
91        rows.push((display_bytes(&item.key), display_bytes(&value)));
92    }
93    Ok(rows)
94}
95
96fn block_on<T>(future: impl Future<Output = T>) -> T {
97    let waker = Waker::from(Arc::new(ThreadWake {
98        thread: thread::current(),
99    }));
100    let mut context = Context::from_waker(&waker);
101    let mut future = std::pin::pin!(future);
102    loop {
103        match Future::poll(future.as_mut(), &mut context) {
104            Poll::Ready(value) => return value,
105            Poll::Pending => thread::park_timeout(Duration::from_millis(10)),
106        }
107    }
108}
109
110struct ThreadWake {
111    thread: thread::Thread,
112}
113
114impl Wake for ThreadWake {
115    fn wake(self: Arc<Self>) {
116        self.thread.unpark();
117    }
118
119    fn wake_by_ref(self: &Arc<Self>) {
120        self.thread.unpark();
121    }
122}
123
124fn display_bytes(bytes: &[u8]) -> String {
125    String::from_utf8_lossy(bytes).into_owned()
126}
127
128fn temp_path(name: &str) -> PathBuf {
129    std::env::temp_dir().join(format!("{name}-{}", std::process::id()))
130}
131
132fn reset_dir(path: &Path) -> Result<()> {
133    match std::fs::remove_dir_all(path) {
134        Ok(()) => {}
135        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
136        Err(error) => return Err(trine_kv::Error::Io(error)),
137    }
138    Ok(())
139}