1use std::{
2 collections::HashMap,
3 path::{Path, PathBuf},
4 pin::Pin,
5 time::Duration,
6 usize,
7};
8
9use async_stream::try_stream;
10use tokio_stream::Stream;
11
12use crate::{
13 remote::{
14 BufferConfiguration, ClientError, RemoteCursor, RemoteDatabase, RemoteEnvironment,
15 RemoteTransaction,
16 },
17 service::RemoteMDBXClient,
18 CommitLatency, Cursor, Database, DatabaseFlags, Environment, EnvironmentBuilder,
19 EnvironmentFlags, EnvironmentKind, Info, Mode, Stat, TableObject, Transaction, TransactionKind,
20 WriteFlags, RO, RW,
21};
22
23type Result<T> = std::result::Result<T, ClientError>;
24
25#[derive(Debug, Clone)]
26pub enum EnvironmentAny {
27 Local(Environment),
28 Remote(RemoteEnvironment),
29}
30
31impl EnvironmentAny {
32 pub fn open_local(path: &Path, builder: EnvironmentBuilder) -> Result<Self> {
33 let db = builder.open(path)?;
34
35 Ok(Self::Local(db))
36 }
37
38 pub async fn open_remote(
39 path: &Path,
40 builder: EnvironmentBuilder,
41 remote: String,
42 deadline: Duration,
43 ) -> Result<Self> {
44 let mut transport = tarpc::serde_transport::tcp::connect(
45 remote,
46 tarpc::tokio_serde::formats::Bincode::default,
47 );
48
49 transport.config_mut().max_frame_length(usize::MAX);
50
51 let transport = transport.await?;
52 let client = RemoteMDBXClient::new(tarpc::client::Config::default(), transport);
53 let env =
54 RemoteEnvironment::open_with_builder(path.to_path_buf(), builder, client, deadline)
55 .await?;
56 Ok(Self::Remote(env))
57 }
58
59 pub async fn open_with_defaults(url: &str, defaults: EnvironmentBuilder) -> Result<Self> {
60 if url.starts_with("mdbx") || url.starts_with("file") {
61 Self::open_url_with_defaults(url, defaults).await
62 } else {
63 Self::open_local(&PathBuf::from(url), defaults)
64 }
65 }
66
67 pub async fn open_url_with_defaults(url: &str, defaults: EnvironmentBuilder) -> Result<Self> {
68 let url = url::Url::parse(url).map_err(|e| ClientError::WrongURL(e))?;
69 let mut builder = defaults;
70
71 let args: HashMap<String, String> = url
72 .query_pairs()
73 .into_iter()
74 .map(|(k, v)| (k.into_owned(), v.into_owned()))
75 .collect();
76
77 let mode = if args.contains_key("ro") {
78 Mode::ReadOnly
79 } else if args.contains_key("rw") {
80 Mode::ReadWrite {
81 sync_mode: crate::SyncMode::Durable,
82 }
83 } else {
84 builder.flags.mode.clone()
85 };
86
87 let exclusive = if args.contains_key("exclusive") {
88 true
89 } else {
90 builder.flags.exclusive
91 };
92 let accede = if args.contains_key("accede") {
93 true
94 } else {
95 builder.flags.accede
96 };
97 let no_sub_dir = if args.contains_key("no_sub_dir") {
98 true
99 } else {
100 builder.flags.no_sub_dir
101 };
102 let flags = EnvironmentFlags {
103 mode,
104 exclusive,
105 accede,
106 no_sub_dir,
107 ..Default::default()
108 };
109
110 let max_readers = args
111 .get("max_readers")
112 .map(|t| u64::from_str_radix(&t, 10))
113 .transpose()
114 .map_err(|_| ClientError::ParseError)?
115 .or(builder.max_readers);
116 let max_dbs = args
117 .get("max_dbs")
118 .map(|t| usize::from_str_radix(&t, 10))
119 .transpose()
120 .map_err(|_| ClientError::ParseError)?
121 .or(builder.max_dbs.map(|t| t as usize));
122 let sync_bytes = args
123 .get("sync_bytes")
124 .map(|t| u64::from_str_radix(&t, 10))
125 .transpose()
126 .map_err(|_| ClientError::ParseError)?
127 .or(builder.sync_bytes);
128 let sync_period = args
129 .get("sync_period")
130 .map(|t| u64::from_str_radix(&t, 10))
131 .transpose()
132 .map_err(|_| ClientError::ParseError)?
133 .or(builder.sync_period);
134
135 builder.set_flags(flags);
136 if let Some(max_db) = max_dbs {
137 builder.set_max_dbs(max_db);
138 }
139
140 if let Some(max_readers) = max_readers {
141 builder.set_max_readers(max_readers);
142 }
143
144 if let Some(sync_bytes) = sync_bytes {
145 builder.set_sync_bytes(sync_bytes as usize);
146 }
147
148 if let Some(sync_period) = sync_period {
149 builder.set_sync_period(Duration::from_secs(sync_period));
150 }
151
152 let deadline = args
153 .get("deadline")
154 .map(|t| u64::from_str_radix(&t, 10))
155 .transpose()
156 .map_err(|_| ClientError::ParseError)?
157 .map(|t| Duration::from_secs(t))
158 .unwrap_or(Duration::from_secs(30));
159
160 match url.scheme() {
161 "file" => Self::open_local(&PathBuf::from(url.path()), builder),
162 "mdbx" => {
163 let fpath = PathBuf::from(url.path());
164 if let Some(host) = url.host_str() {
165 let target = format!("{}:{}", host, url.port().unwrap_or(1899));
166
167 Self::open_remote(&fpath, builder, target, deadline).await
168 } else {
169 Self::open_local(&PathBuf::from(url.path()), builder)
170 }
171 }
172 _ => Err(ClientError::ParseError),
173 }
174 }
175
176 pub async fn open(url: &str) -> Result<Self> {
177 let mut defaults = Environment::builder();
178 defaults
179 .set_flags(EnvironmentFlags {
180 mode: Mode::ReadOnly,
181 ..Default::default()
182 })
183 .set_max_dbs(256)
184 .set_max_readers(256);
185 Self::open_with_defaults(url, defaults).await
186 }
187
188 pub async fn begin_ro_txn(&self) -> Result<TransactionAny<RO>> {
189 match self {
190 Self::Local(env) => {
191 let env = env.clone();
192 Ok(TransactionAny::Local(
193 tokio::task::spawn_blocking(move || env.begin_ro_txn()).await??,
194 ))
195 }
196 Self::Remote(env) => Ok(TransactionAny::Remote(env.begin_ro_txn().await?)),
197 }
198 }
199
200 pub async fn begin_rw_txn(&self) -> Result<TransactionAny<RW>> {
201 match self {
202 Self::Local(env) => {
203 let env = env.clone();
204 Ok(TransactionAny::Local(
205 tokio::task::spawn_blocking(move || env.begin_rw_txn()).await??,
206 ))
207 }
208 Self::Remote(env) => Ok(TransactionAny::Remote(env.begin_rw_txn().await?)),
209 }
210 }
211
212 pub async fn sync(&self, force: bool) -> Result<bool> {
213 match self {
214 Self::Local(env) => {
215 let env = env.clone();
216 Ok(tokio::task::spawn_blocking(move || env.sync(force)).await??)
217 }
218 Self::Remote(env) => Ok(env.sync(force).await?),
219 }
220 }
221
222 pub async fn stat(&self) -> Result<Stat> {
223 match self {
224 Self::Local(env) => Ok(env.stat()?),
225 Self::Remote(env) => Ok(env.stat().await?),
226 }
227 }
228
229 pub async fn info(&self) -> Result<Info> {
230 match self {
231 Self::Local(env) => Ok(env.info()?),
232 Self::Remote(env) => Ok(env.info().await?),
233 }
234 }
235
236 pub fn env_kind(&self) -> EnvironmentKind {
237 match self {
238 Self::Local(env) => env.env_kind(),
239 Self::Remote(env) => env.env_kind(),
240 }
241 }
242
243 pub fn is_write_map(&self) -> bool {
244 self.env_kind().is_write_map()
245 }
246
247 pub async fn is_read_write(&self) -> Result<bool> {
248 Ok(!self.is_read_only().await?)
249 }
250
251 pub async fn is_read_only(&self) -> Result<bool> {
252 Ok(matches!(self.info().await?.mode(), Mode::ReadOnly))
253 }
254}
255
256#[derive(Debug)]
257pub enum DatabaseAny {
258 Local(Database),
259 Remote(RemoteDatabase),
260}
261
262impl DatabaseAny {
263 pub fn dbi(&self) -> u32 {
264 match self {
265 Self::Local(db) => db.dbi(),
266 Self::Remote(db) => db.dbi(),
267 }
268 }
269}
270
271#[derive(Debug, Clone)]
272pub enum TransactionAny<K: TransactionKind> {
273 Local(Transaction<K>),
274 Remote(RemoteTransaction<K>),
275}
276
277impl<K: TransactionKind> TransactionAny<K> {
278 pub async fn open_db(&self, db: Option<&str>) -> Result<DatabaseAny> {
279 match self {
280 Self::Local(tx) => {
281 let tx = tx.clone();
282 let db = db.map(|t| t.to_string());
283 Ok(DatabaseAny::Local(
284 tokio::task::spawn_blocking(move || {
285 tx.open_db(db.as_ref().map(|t| t.as_str()))
286 })
287 .await??,
288 ))
289 }
290 Self::Remote(tx) => Ok(DatabaseAny::Remote(
291 tx.open_db(db.map(|t| t.to_string())).await?,
292 )),
293 }
294 }
295
296 pub async fn get<V: TableObject>(&self, dbi: u32, key: &[u8]) -> Result<Option<V>> {
297 match self {
298 Self::Local(tx) => Ok(tx.get::<V>(dbi, key)?),
299 Self::Remote(tx) => Ok(tx.get::<V>(dbi, key.to_vec()).await?),
300 }
301 }
302
303 pub async fn db_stat(&self, db: &DatabaseAny) -> Result<Stat> {
304 self.db_stat_with_dbi(db.dbi()).await
305 }
306
307 pub async fn db_stat_with_dbi(&self, dbi: u32) -> Result<Stat> {
308 match self {
309 Self::Local(tx) => Ok(tx.db_stat_with_dbi(dbi)?),
310 Self::Remote(tx) => Ok(tx.db_stat_with_dbi(dbi).await?),
311 }
312 }
313
314 pub async fn cursor_with_dbi(&self, dbi: u32) -> Result<CursorAny<K>> {
315 match self {
316 Self::Local(tx) => Ok(CursorAny::Local(tx.cursor_with_dbi(dbi)?)),
317 Self::Remote(tx) => Ok(CursorAny::Remote(tx.cursor(dbi).await?)),
318 }
319 }
320
321 pub async fn cursor(&self, db: &DatabaseAny) -> Result<CursorAny<K>> {
322 self.cursor_with_dbi(db.dbi()).await
323 }
324}
325
326impl TransactionAny<RW> {
327 pub async fn begin_nested_txn(&mut self) -> Result<Self> {
328 match self {
329 Self::Local(tx) => Ok(Self::Local(tx.begin_nested_txn()?)),
330 Self::Remote(tx) => Ok(Self::Remote(tx.begin_nested_txn().await?)),
331 }
332 }
333
334 pub async fn clear_db(&self, dbi: u32) -> Result<()> {
335 match self {
336 Self::Local(tx) => Ok(tx.clear_db(dbi)?),
337 Self::Remote(tx) => Ok(tx.clear_db(dbi).await?),
338 }
339 }
340
341 pub async fn put(&self, dbi: u32, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
342 match self {
343 Self::Local(tx) => Ok(tx.put(dbi, key, data, flags)?),
344 Self::Remote(tx) => Ok(tx.put(dbi, key.to_vec(), data.to_vec(), flags).await?),
345 }
346 }
347
348 pub async fn del(&self, dbi: u32, key: &[u8], value: Option<&[u8]>) -> Result<bool> {
349 match self {
350 Self::Local(tx) => Ok(tx.del(dbi, key, value)?),
351 Self::Remote(tx) => Ok(tx.del(dbi, key.to_vec(), value.map(|t| t.to_vec())).await?),
352 }
353 }
354
355 pub async fn create_db(&self, db: Option<&str>, flags: DatabaseFlags) -> Result<DatabaseAny> {
356 match self {
357 Self::Local(tx) => {
358 let tx = tx.clone();
359 let db = db.map(|t| t.to_string());
360 Ok(DatabaseAny::Local(
361 tokio::task::spawn_blocking(move || {
362 tx.create_db(db.as_ref().map(|t| t.as_str()), flags)
363 })
364 .await??,
365 ))
366 }
367 Self::Remote(tx) => Ok(DatabaseAny::Remote(
368 tx.create_db(db.map(|t| t.to_string()), flags).await?,
369 )),
370 }
371 }
372
373 pub async fn commit(self) -> Result<(bool, CommitLatency)> {
374 match self {
375 Self::Local(tx) => Ok(tokio::task::spawn_blocking(move || tx.commit()).await??),
376 Self::Remote(tx) => Ok(tx.commit().await?),
377 }
378 }
379}
380
381#[derive(Debug)]
382pub enum CursorAny<K: TransactionKind> {
383 Local(Cursor<K>),
384 Remote(RemoteCursor<K>),
385}
386
387impl<K: TransactionKind> CursorAny<K> {
388 pub async fn cursor_clone(&self) -> Result<Self> {
389 match self {
390 Self::Local(cur) => Ok(Self::Local(cur.clone())),
391 Self::Remote(cur) => Ok(Self::Remote(cur.cur_clone().await?)),
392 }
393 }
394
395 pub async fn first<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
396 where
397 Key: TableObject,
398 Value: TableObject,
399 {
400 match self {
401 Self::Local(cur) => Ok(cur.first()?),
402 Self::Remote(cur) => Ok(cur.first().await?),
403 }
404 }
405
406 pub async fn first_dup<Value>(&mut self) -> Result<Option<Value>>
407 where
408 Value: TableObject,
409 {
410 match self {
411 Self::Local(cur) => Ok(cur.first_dup()?),
412 Self::Remote(cur) => Ok(cur.first_dup().await?),
413 }
414 }
415
416 pub async fn get_both<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
417 where
418 Value: TableObject,
419 {
420 match self {
421 Self::Local(cur) => Ok(cur.get_both(k, v)?),
422 Self::Remote(cur) => Ok(cur.get_both(k.to_vec(), v.to_vec()).await?),
423 }
424 }
425
426 pub async fn get_both_range<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
427 where
428 Value: TableObject,
429 {
430 match self {
431 Self::Local(cur) => Ok(cur.get_both_range(k, v)?),
432 Self::Remote(cur) => Ok(cur.get_both_range(k.to_vec(), v.to_vec()).await?),
433 }
434 }
435
436 pub async fn get_current<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
437 where
438 Key: TableObject,
439 Value: TableObject,
440 {
441 match self {
442 Self::Local(cur) => Ok(cur.get_current()?),
443 Self::Remote(cur) => Ok(cur.get_current().await?),
444 }
445 }
446
447 pub async fn get_multiple<Value>(&mut self) -> Result<Option<Value>>
448 where
449 Value: TableObject,
450 {
451 match self {
452 Self::Local(cur) => Ok(cur.get_multiple()?),
453 Self::Remote(cur) => Ok(cur.get_multiple().await?),
454 }
455 }
456
457 pub async fn last<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
458 where
459 Key: TableObject,
460 Value: TableObject,
461 {
462 match self {
463 Self::Local(cur) => Ok(cur.last()?),
464 Self::Remote(cur) => Ok(cur.last().await?),
465 }
466 }
467
468 pub async fn last_dup<Value>(&mut self) -> Result<Option<Value>>
469 where
470 Value: TableObject,
471 {
472 match self {
473 Self::Local(cur) => Ok(cur.last_dup()?),
474 Self::Remote(cur) => Ok(cur.last_dup().await?),
475 }
476 }
477
478 pub async fn next<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
479 where
480 Key: TableObject,
481 Value: TableObject,
482 {
483 match self {
484 Self::Local(cur) => Ok(cur.next()?),
485 Self::Remote(cur) => Ok(cur.next().await?),
486 }
487 }
488
489 pub async fn next_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
490 where
491 Key: TableObject,
492 Value: TableObject,
493 {
494 match self {
495 Self::Local(cur) => Ok(cur.next_dup()?),
496 Self::Remote(cur) => Ok(cur.next_dup().await?),
497 }
498 }
499
500 pub async fn next_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
501 where
502 Key: TableObject,
503 Value: TableObject,
504 {
505 match self {
506 Self::Local(cur) => Ok(cur.next_multiple()?),
507 Self::Remote(cur) => Ok(cur.next_multiple().await?),
508 }
509 }
510
511 pub async fn next_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
512 where
513 Key: TableObject,
514 Value: TableObject,
515 {
516 match self {
517 Self::Local(cur) => Ok(cur.next_nodup()?),
518 Self::Remote(cur) => Ok(cur.next_nodup().await?),
519 }
520 }
521
522 pub async fn prev<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
523 where
524 Key: TableObject,
525 Value: TableObject,
526 {
527 match self {
528 Self::Local(cur) => Ok(cur.prev()?),
529 Self::Remote(cur) => Ok(cur.prev().await?),
530 }
531 }
532
533 pub async fn prev_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
534 where
535 Key: TableObject,
536 Value: TableObject,
537 {
538 match self {
539 Self::Local(cur) => Ok(cur.prev_dup()?),
540 Self::Remote(cur) => Ok(cur.prev_dup().await?),
541 }
542 }
543
544 pub async fn prev_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
545 where
546 Key: TableObject,
547 Value: TableObject,
548 {
549 match self {
550 Self::Local(cur) => Ok(cur.prev_nodup()?),
551 Self::Remote(cur) => Ok(cur.prev_nodup().await?),
552 }
553 }
554
555 pub async fn set<Value>(&mut self, key: &[u8]) -> Result<Option<Value>>
556 where
557 Value: TableObject,
558 {
559 match self {
560 Self::Local(cur) => Ok(cur.set(key)?),
561 Self::Remote(cur) => Ok(cur.set(key.to_vec()).await?),
562 }
563 }
564 pub async fn set_key<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
565 where
566 Key: TableObject,
567 Value: TableObject,
568 {
569 match self {
570 Self::Local(cur) => Ok(cur.set_key(key)?),
571 Self::Remote(cur) => Ok(cur.set_key(key.to_vec()).await?),
572 }
573 }
574
575 pub async fn set_range<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
576 where
577 Key: TableObject,
578 Value: TableObject,
579 {
580 match self {
581 Self::Local(cur) => Ok(cur.set_range(key)?),
582 Self::Remote(cur) => Ok(cur.set_range(key.to_vec()).await?),
583 }
584 }
585
586 pub async fn prev_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
587 where
588 Key: TableObject,
589 Value: TableObject,
590 {
591 match self {
592 Self::Local(cur) => Ok(cur.prev_multiple()?),
593 Self::Remote(cur) => Ok(cur.prev_multiple().await?),
594 }
595 }
596
597 pub async fn set_lowerbound<Key, Value>(
598 &mut self,
599 key: &[u8],
600 ) -> Result<Option<(bool, Key, Value)>>
601 where
602 Key: TableObject,
603 Value: TableObject,
604 {
605 match self {
606 Self::Local(cur) => Ok(cur.set_lowerbound(key)?),
607 Self::Remote(cur) => Ok(cur.set_lowerbound(key.to_vec()).await?),
608 }
609 }
610
611 fn iter_to_stream<'cur, Key, Value>(
612 itr: crate::cursor::Iter<'cur, K, Key, Value>,
613 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>
614 where
615 Key: TableObject + Send + 'cur,
616 Value: TableObject + Send + 'cur,
617 {
618 Box::pin(try_stream! {
619 for it in itr {
620 let (k, v) = it?;
621 yield (k, v);
622 }
623 })
624 }
625
626 fn intoiter_to_stream<'cur, Key, Value>(
627 itr: crate::cursor::IntoIter<'cur, K, Key, Value>,
628 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>
629 where
630 Key: TableObject + Send + 'cur,
631 Value: TableObject + Send + 'cur,
632 {
633 Box::pin(try_stream! {
634 for it in itr {
635 let (k, v) = it?;
636 yield (k, v);
637 }
638 })
639 }
640
641 fn iterdup_to_steam<'cur, Key, Value>(
642 iterdup: crate::cursor::IterDup<'cur, K, Key, Value>,
643 ) -> Pin<
644 Box<
645 dyn Stream<
646 Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>>,
647 > + Send
648 + 'cur,
649 >,
650 >
651 where
652 Key: TableObject + Send + 'cur,
653 Value: TableObject + Send + 'cur,
654 {
655 Box::pin(try_stream! {
656 for it in iterdup {
657 let st = Self::intoiter_to_stream(it);
658 yield st;
659 }
660 })
661 }
662
663 pub fn iter<'a, Key, Value>(
664 &'a mut self,
665 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
666 where
667 Key: TableObject + Send + 'a,
668 Value: TableObject + Send + 'a,
669 {
670 match self {
671 Self::Local(cur) => Self::iter_to_stream(cur.iter::<Key, Value>()),
672 Self::Remote(cur) => cur.iter(),
673 }
674 }
675
676 pub fn into_iter_buffered<'a, Key, Value>(
677 self,
678 buffer_config: BufferConfiguration,
679 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
680 where
681 Key: TableObject + Send + 'a,
682 Value: TableObject + Send + 'a,
683 {
684 match self {
685 Self::Local(mut cur) => Box::pin(try_stream! {
686 for it in cur.iter::<Key, Value>() {
687 let (k, v) = it?;
688 yield (k, v);
689 }
690 }),
691 Self::Remote(cur) => cur.into_iter_buffered(buffer_config),
692 }
693 }
694
695 pub fn iter_start<'a, Key, Value>(
696 &'a mut self,
697 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
698 where
699 Key: TableObject + Send + 'a,
700 Value: TableObject + Send + 'a,
701 {
702 match self {
703 Self::Local(cur) => Self::iter_to_stream(cur.iter_start::<Key, Value>()),
704 Self::Remote(cur) => cur.iter_start(),
705 }
706 }
707
708 pub fn into_iter_start_buffered<'a, Key, Value>(
709 self,
710 buffer_config: BufferConfiguration,
711 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
712 where
713 Key: TableObject + Send + 'a,
714 Value: TableObject + Send + 'a,
715 {
716 match self {
717 Self::Local(mut cur) => Box::pin(try_stream! {
718 for it in cur.iter_start::<Key, Value>() {
719 let (k, v) = it?;
720 yield (k, v);
721 }
722 }),
723 Self::Remote(cur) => cur.into_iter_start_buffered(buffer_config),
724 }
725 }
726
727 pub async fn iter_from<'a, Key, Value>(
728 &'a mut self,
729 key: &[u8],
730 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
731 where
732 Key: TableObject + Send + 'a,
733 Value: TableObject + Send + 'a,
734 {
735 Ok(match self {
736 Self::Local(cur) => Self::iter_to_stream(cur.iter_from::<Key, Value>(&key)),
737 Self::Remote(cur) => cur.iter_from(key.to_vec()).await?,
738 })
739 }
740
741 pub async fn into_iter_from_buffered<'a, Key, Value>(
742 self,
743 key: &'a [u8],
744 buffer_config: BufferConfiguration,
745 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
746 where
747 Key: TableObject + Send + 'a,
748 Value: TableObject + Send + 'a,
749 {
750 Ok(match self {
751 Self::Local(mut cur) => Box::pin(try_stream! {
752 for it in cur.iter_from::<Key, Value>(&key) {
753 let (k, v) = it?;
754 yield (k, v);
755 }
756 }),
757 Self::Remote(cur) => {
758 cur.into_iter_from_buffered(key.to_vec(), buffer_config)
759 .await?
760 }
761 })
762 }
763
764 pub fn iter_dup<'a, Key, Value>(
765 &'a mut self,
766 ) -> Pin<
767 Box<
768 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
769 + Send
770 + 'a,
771 >,
772 >
773 where
774 Key: TableObject + Send + 'a,
775 Value: TableObject + Send + 'a,
776 {
777 match self {
778 Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup()),
779 Self::Remote(cur) => cur.iter_dup(),
780 }
781 }
782
783 pub fn into_iter_dup_buffered<'a, Key, Value>(
784 self,
785 buffer_config: BufferConfiguration,
786 ) -> Pin<
787 Box<
788 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
789 + Send
790 + 'a,
791 >,
792 >
793 where
794 Key: TableObject + Send + 'a,
795 Value: TableObject + Send + 'a,
796 {
797 match self {
798 Self::Local(cur) => Box::pin(try_stream! {
799 for it in cur.into_iter_dup() {
800 let st = Self::intoiter_to_stream(it);
801 yield st;
802 }
803 }),
804 Self::Remote(cur) => cur.into_iter_dup_buffered(buffer_config),
805 }
806 }
807
808 pub fn iter_dup_start<'a, Key, Value>(
809 &'a mut self,
810 ) -> Pin<
811 Box<
812 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
813 + Send
814 + 'a,
815 >,
816 >
817 where
818 Key: TableObject + Send + 'a,
819 Value: TableObject + Send + 'a,
820 {
821 match self {
822 Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup_start()),
823 Self::Remote(cur) => cur.iter_dup_start(),
824 }
825 }
826
827 pub fn into_iter_dup_start_buffered<'a, Key, Value>(
828 self,
829 buffer_config: BufferConfiguration,
830 ) -> Pin<
831 Box<
832 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
833 + Send
834 + 'a,
835 >,
836 >
837 where
838 Key: TableObject + Send + 'a,
839 Value: TableObject + Send + 'a,
840 {
841 match self {
842 Self::Local(cur) => Box::pin(try_stream! {
843 for it in cur.into_iter_dup_start() {
844 let st = Self::intoiter_to_stream(it);
845 yield st;
846 }
847 }),
848 Self::Remote(cur) => cur.into_iter_dup_start_buffered(buffer_config),
849 }
850 }
851
852 pub async fn iter_dup_from<'a, Key, Value>(
853 &'a mut self,
854 key: &[u8],
855 ) -> Result<
856 Pin<
857 Box<
858 dyn Stream<
859 Item = Result<
860 Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
861 >,
862 > + Send
863 + 'a,
864 >,
865 >,
866 >
867 where
868 Key: TableObject + Send + 'a,
869 Value: TableObject + Send + 'a,
870 {
871 Ok(match self {
872 Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup_from(&key)),
873 Self::Remote(cur) => cur.iter_dup_from(key.to_vec()).await?,
874 })
875 }
876
877 pub async fn into_iter_dup_from_buffered<'a, Key, Value>(
878 self,
879 key: &'a [u8],
880 buffer_config: BufferConfiguration,
881 ) -> Result<
882 Pin<
883 Box<
884 dyn Stream<
885 Item = Result<
886 Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
887 >,
888 > + Send
889 + 'a,
890 >,
891 >,
892 >
893 where
894 Key: TableObject + Send + 'a,
895 Value: TableObject + Send + 'a,
896 {
897 Ok(match self {
898 Self::Local(mut cur) => Box::pin(try_stream! {
899 for it in cur.into_iter_dup_from(&key) {
900 let st = Self::intoiter_to_stream(it);
901 yield st;
902 }
903 }),
904 Self::Remote(cur) => {
905 cur.into_iter_dup_from_buffered(key.to_vec(), buffer_config)
906 .await?
907 }
908 })
909 }
910
911 pub async fn iter_dup_of<'a, Key, Value>(
912 &'a mut self,
913 key: &[u8],
914 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
915 where
916 Key: TableObject + Send + 'a,
917 Value: TableObject + Send + 'a,
918 {
919 Ok(match self {
920 Self::Local(cur) => Self::iter_to_stream(cur.iter_dup_of(&key)),
921 Self::Remote(cur) => cur.iter_dup_of(key.to_vec()).await?,
922 })
923 }
924
925 pub async fn into_iter_dup_of_buffered<'a, Key, Value>(
926 self,
927 key: &'a [u8],
928 buffer_config: BufferConfiguration,
929 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
930 where
931 Key: TableObject + Send + 'a,
932 Value: TableObject + Send + 'a,
933 {
934 Ok(match self {
935 Self::Local(mut cur) => Box::pin(try_stream! {
936 for it in cur.into_iter_dup_of(&key) {
937 let (k, v) = it?;
938 yield (k, v);
939 }
940 }),
941 Self::Remote(cur) => {
942 cur.into_iter_dup_of_buffered(key.to_vec(), buffer_config)
943 .await?
944 }
945 })
946 }
947}
948
949impl CursorAny<RW> {
950 pub async fn put(&mut self, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
951 match self {
952 Self::Local(cur) => Ok(cur.put(key, data, flags)?),
953 Self::Remote(cur) => Ok(cur.put(key.to_vec(), data.to_vec(), flags).await?),
954 }
955 }
956
957 pub async fn del(&mut self, flags: WriteFlags) -> Result<()> {
958 match self {
959 Self::Local(cur) => Ok(cur.del(flags)?),
960 Self::Remote(cur) => Ok(cur.del(flags).await?),
961 }
962 }
963}