1use std::{
2 future::Future,
3 path::{Path, PathBuf},
4 sync::Arc,
5 task::{Context, Poll, Wake, Waker},
6 thread,
7 time::Duration,
8};
9
10use trine_kv::{
11 Db, DbOptions, KeyRange, LazyIter, Result, TransactionOptions, WriteBatch, WriteOptions,
12};
13
14fn main() -> Result<()> {
15 let path = temp_path("trine-kv-quickstart");
16 reset_dir(&path)?;
17 block_on(run(&path))?;
18 reset_dir(&path)?;
19 Ok(())
20}
21
22async fn run(path: &Path) -> Result<()> {
23 let mut options = DbOptions::new(path);
24 options.background_worker_count = 0;
25
26 let db = Db::open(options).await?;
27 let users = db.bucket("users").await?;
28
29 users.put(b"user:001".to_vec(), b"Ada".to_vec()).await?;
30
31 let mut batch = WriteBatch::new();
32 batch.put_bucket("users", b"user:002".to_vec(), b"Lin".to_vec())?;
33 batch.put_bucket("users", b"team:core".to_vec(), b"database".to_vec())?;
34 db.write(batch, WriteOptions::default()).await?;
35
36 assert_eq!(users.get(b"user:001").await?, Some(b"Ada".to_vec()));
37
38 let snapshot = db.snapshot();
39 users.put(b"user:003".to_vec(), b"Grace".to_vec()).await?;
40 assert_eq!(
41 users.get_at(&snapshot, b"user:003").await?,
42 None,
43 "snapshot reads stay pinned to their original sequence",
44 );
45 assert_eq!(users.get(b"user:003").await?, Some(b"Grace".to_vec()));
46
47 let prefix_rows = collect_lazy_rows(users.prefix_lazy(b"user:".to_vec()).await?).await?;
48 assert_eq!(
49 prefix_rows,
50 [
51 ("user:001".to_owned(), "Ada".to_owned()),
52 ("user:002".to_owned(), "Lin".to_owned()),
53 ("user:003".to_owned(), "Grace".to_owned()),
54 ],
55 );
56
57 let mut transaction = db.transaction(TransactionOptions::default());
58 assert_eq!(
59 transaction.get_bucket("users", b"user:001").await?,
60 Some(b"Ada".to_vec()),
61 );
62 transaction
63 .read_range_bucket("users", KeyRange::half_open(b"user:001", b"user:004"))
64 .await?;
65 transaction.put_bucket("users", b"user:004".to_vec(), b"Barbara".to_vec())?;
66 transaction.commit().await?;
67
68 db.flush().await?;
69 drop(users);
70 drop(snapshot);
71 db.close().await?;
72
73 let reopened = Db::open(DbOptions::new(path).read_only()).await?;
74 let users = reopened.bucket("users").await?;
75 assert_eq!(users.get(b"user:004").await?, Some(b"Barbara".to_vec()));
76
77 let stats = reopened.stats();
78 assert_eq!(stats.live_buckets, 2);
79 assert!(stats.total_tables > 0);
80 assert!(stats.storage_uses_sync_adapter);
81 assert!(!stats.storage_uses_platform_async_io);
82
83 drop(users);
84 reopened.close().await
85}
86
87async fn collect_lazy_rows(mut iter: LazyIter) -> Result<Vec<(String, String)>> {
88 let mut rows = Vec::new();
89 while let Some(item) = iter.next().await? {
90 let value = item.value.read().await?;
91 rows.push((display_bytes(&item.key), display_bytes(&value)));
92 }
93 Ok(rows)
94}
95
96fn block_on<T>(future: impl Future<Output = T>) -> T {
97 let waker = Waker::from(Arc::new(ThreadWake {
98 thread: thread::current(),
99 }));
100 let mut context = Context::from_waker(&waker);
101 let mut future = std::pin::pin!(future);
102 loop {
103 match Future::poll(future.as_mut(), &mut context) {
104 Poll::Ready(value) => return value,
105 Poll::Pending => thread::park_timeout(Duration::from_millis(10)),
106 }
107 }
108}
109
110struct ThreadWake {
111 thread: thread::Thread,
112}
113
114impl Wake for ThreadWake {
115 fn wake(self: Arc<Self>) {
116 self.thread.unpark();
117 }
118
119 fn wake_by_ref(self: &Arc<Self>) {
120 self.thread.unpark();
121 }
122}
123
124fn display_bytes(bytes: &[u8]) -> String {
125 String::from_utf8_lossy(bytes).into_owned()
126}
127
128fn temp_path(name: &str) -> PathBuf {
129 std::env::temp_dir().join(format!("{name}-{}", std::process::id()))
130}
131
132fn reset_dir(path: &Path) -> Result<()> {
133 match std::fs::remove_dir_all(path) {
134 Ok(()) => {}
135 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
136 Err(error) => return Err(trine_kv::Error::Io(error)),
137 }
138 Ok(())
139}