1use std::{
2 collections::HashMap,
3 path::{Path, PathBuf},
4 pin::Pin,
5 time::Duration,
6 usize,
7};
8
9use async_stream::try_stream;
10use tokio_stream::Stream;
11
12use crate::{
13 remote::{
14 BufferConfiguration, ClientError, RemoteCursor, RemoteDatabase, RemoteEnvironment,
15 RemoteTransaction,
16 },
17 service::RemoteMDBXClient,
18 CommitLatency, Cursor, Database, DatabaseFlags, Environment, EnvironmentBuilder,
19 EnvironmentFlags, EnvironmentKind, Info, Mode, Stat, TableObject, Transaction, TransactionKind,
20 WriteFlags, RO, RW,
21};
22
23type Result<T> = std::result::Result<T, ClientError>;
24
25#[derive(Debug, Clone)]
26pub enum EnvironmentAny {
27 Local(Environment),
28 Remote(RemoteEnvironment),
29}
30
31impl EnvironmentAny {
32 pub fn open_local(path: &Path, builder: EnvironmentBuilder) -> Result<Self> {
33 let db = builder.open(path)?;
34
35 Ok(Self::Local(db))
36 }
37
38 pub async fn open_remote(
39 path: &Path,
40 builder: EnvironmentBuilder,
41 remote: String,
42 deadline: Duration,
43 ) -> Result<Self> {
44 let mut transport = tarpc::serde_transport::tcp::connect(
45 remote,
46 tarpc::tokio_serde::formats::Bincode::default,
47 );
48
49 transport.config_mut().max_frame_length(usize::MAX);
50
51 let transport = transport.await?;
52 let client = RemoteMDBXClient::new(tarpc::client::Config::default(), transport);
53 let env =
54 RemoteEnvironment::open_with_builder(path.to_path_buf(), builder, client, deadline)
55 .await?;
56 Ok(Self::Remote(env))
57 }
58
59 pub async fn open_with_defaults(url: &str, defaults: EnvironmentBuilder) -> Result<Self> {
60 let url = url::Url::parse(url).map_err(|e| ClientError::WrongURL(e))?;
61 let mut builder = defaults;
62
63 let args: HashMap<String, String> = url
64 .query_pairs()
65 .into_iter()
66 .map(|(k, v)| (k.into_owned(), v.into_owned()))
67 .collect();
68
69 let mode = if args.contains_key("ro") {
70 Mode::ReadOnly
71 } else if args.contains_key("rw") {
72 Mode::ReadWrite {
73 sync_mode: crate::SyncMode::Durable,
74 }
75 } else {
76 builder.flags.mode.clone()
77 };
78
79 let exclusive = if args.contains_key("exclusive") {
80 true
81 } else {
82 builder.flags.exclusive
83 };
84 let accede = if args.contains_key("accede") {
85 true
86 } else {
87 builder.flags.accede
88 };
89 let no_sub_dir = if args.contains_key("no_sub_dir") {
90 true
91 } else {
92 builder.flags.no_sub_dir
93 };
94 let flags = EnvironmentFlags {
95 mode,
96 exclusive,
97 accede,
98 no_sub_dir,
99 ..Default::default()
100 };
101
102 let max_readers = args
103 .get("max_readers")
104 .map(|t| u64::from_str_radix(&t, 10))
105 .transpose()
106 .map_err(|_| ClientError::ParseError)?
107 .or(builder.max_readers);
108 let max_dbs = args
109 .get("max_dbs")
110 .map(|t| usize::from_str_radix(&t, 10))
111 .transpose()
112 .map_err(|_| ClientError::ParseError)?
113 .or(builder.max_dbs.map(|t| t as usize));
114 let sync_bytes = args
115 .get("sync_bytes")
116 .map(|t| u64::from_str_radix(&t, 10))
117 .transpose()
118 .map_err(|_| ClientError::ParseError)?
119 .or(builder.sync_bytes);
120 let sync_period = args
121 .get("sync_period")
122 .map(|t| u64::from_str_radix(&t, 10))
123 .transpose()
124 .map_err(|_| ClientError::ParseError)?
125 .or(builder.sync_period);
126
127 builder.set_flags(flags);
128 if let Some(max_db) = max_dbs {
129 builder.set_max_dbs(max_db);
130 }
131
132 if let Some(max_readers) = max_readers {
133 builder.set_max_readers(max_readers);
134 }
135
136 if let Some(sync_bytes) = sync_bytes {
137 builder.set_sync_bytes(sync_bytes as usize);
138 }
139
140 if let Some(sync_period) = sync_period {
141 builder.set_sync_period(Duration::from_secs(sync_period));
142 }
143
144 let deadline = args
145 .get("deadline")
146 .map(|t| u64::from_str_radix(&t, 10))
147 .transpose()
148 .map_err(|_| ClientError::ParseError)?
149 .map(|t| Duration::from_secs(t))
150 .unwrap_or(Duration::from_secs(30));
151
152 match url.scheme() {
153 "file" => Self::open_local(&PathBuf::from(url.path()), builder),
154 "mdbx" => {
155 let fpath = PathBuf::from(url.path());
156 if let Some(host) = url.host_str() {
157 let target = format!("{}:{}", host, url.port().unwrap_or(1899));
158
159 Self::open_remote(&fpath, builder, target, deadline).await
160 } else {
161 Self::open_local(&PathBuf::from(url.path()), builder)
162 }
163 }
164 _ => Err(ClientError::ParseError),
165 }
166 }
167
168 pub async fn open(url: &str) -> Result<Self> {
169 let mut defaults = Environment::builder();
170 defaults
171 .set_flags(EnvironmentFlags {
172 mode: Mode::ReadOnly,
173 ..Default::default()
174 })
175 .set_max_dbs(256)
176 .set_max_readers(256);
177 Self::open_with_defaults(url, defaults).await
178 }
179
180 pub async fn begin_ro_txn(&self) -> Result<TransactionAny<RO>> {
181 match self {
182 Self::Local(env) => {
183 let env = env.clone();
184 Ok(TransactionAny::Local(
185 tokio::task::spawn_blocking(move || env.begin_ro_txn()).await??,
186 ))
187 }
188 Self::Remote(env) => Ok(TransactionAny::Remote(env.begin_ro_txn().await?)),
189 }
190 }
191
192 pub async fn begin_rw_txn(&self) -> Result<TransactionAny<RW>> {
193 match self {
194 Self::Local(env) => {
195 let env = env.clone();
196 Ok(TransactionAny::Local(
197 tokio::task::spawn_blocking(move || env.begin_rw_txn()).await??,
198 ))
199 }
200 Self::Remote(env) => Ok(TransactionAny::Remote(env.begin_rw_txn().await?)),
201 }
202 }
203
204 pub async fn sync(&self, force: bool) -> Result<bool> {
205 match self {
206 Self::Local(env) => {
207 let env = env.clone();
208 Ok(tokio::task::spawn_blocking(move || env.sync(force)).await??)
209 }
210 Self::Remote(env) => Ok(env.sync(force).await?),
211 }
212 }
213
214 pub async fn stat(&self) -> Result<Stat> {
215 match self {
216 Self::Local(env) => Ok(env.stat()?),
217 Self::Remote(env) => Ok(env.stat().await?),
218 }
219 }
220
221 pub async fn info(&self) -> Result<Info> {
222 match self {
223 Self::Local(env) => Ok(env.info()?),
224 Self::Remote(env) => Ok(env.info().await?),
225 }
226 }
227
228 pub fn env_kind(&self) -> EnvironmentKind {
229 match self {
230 Self::Local(env) => env.env_kind(),
231 Self::Remote(env) => env.env_kind(),
232 }
233 }
234
235 pub fn is_write_map(&self) -> bool {
236 self.env_kind().is_write_map()
237 }
238
239 pub async fn is_read_write(&self) -> Result<bool> {
240 Ok(!self.is_read_only().await?)
241 }
242
243 pub async fn is_read_only(&self) -> Result<bool> {
244 Ok(matches!(self.info().await?.mode(), Mode::ReadOnly))
245 }
246}
247
248#[derive(Debug)]
249pub enum DatabaseAny {
250 Local(Database),
251 Remote(RemoteDatabase),
252}
253
254impl DatabaseAny {
255 pub fn dbi(&self) -> u32 {
256 match self {
257 Self::Local(db) => db.dbi(),
258 Self::Remote(db) => db.dbi(),
259 }
260 }
261}
262
263#[derive(Debug, Clone)]
264pub enum TransactionAny<K: TransactionKind> {
265 Local(Transaction<K>),
266 Remote(RemoteTransaction<K>),
267}
268
269impl<K: TransactionKind> TransactionAny<K> {
270 pub async fn open_db(&self, db: Option<&str>) -> Result<DatabaseAny> {
271 match self {
272 Self::Local(tx) => {
273 let tx = tx.clone();
274 let db = db.map(|t| t.to_string());
275 Ok(DatabaseAny::Local(
276 tokio::task::spawn_blocking(move || {
277 tx.open_db(db.as_ref().map(|t| t.as_str()))
278 })
279 .await??,
280 ))
281 }
282 Self::Remote(tx) => Ok(DatabaseAny::Remote(
283 tx.open_db(db.map(|t| t.to_string())).await?,
284 )),
285 }
286 }
287
288 pub async fn get<V: TableObject>(&self, dbi: u32, key: &[u8]) -> Result<Option<V>> {
289 match self {
290 Self::Local(tx) => Ok(tx.get::<V>(dbi, key)?),
291 Self::Remote(tx) => Ok(tx.get::<V>(dbi, key.to_vec()).await?),
292 }
293 }
294
295 pub async fn db_stat(&self, db: &DatabaseAny) -> Result<Stat> {
296 self.db_stat_with_dbi(db.dbi()).await
297 }
298
299 pub async fn db_stat_with_dbi(&self, dbi: u32) -> Result<Stat> {
300 match self {
301 Self::Local(tx) => Ok(tx.db_stat_with_dbi(dbi)?),
302 Self::Remote(tx) => Ok(tx.db_stat_with_dbi(dbi).await?),
303 }
304 }
305
306 pub async fn cursor_with_dbi(&self, dbi: u32) -> Result<CursorAny<K>> {
307 match self {
308 Self::Local(tx) => Ok(CursorAny::Local(tx.cursor_with_dbi(dbi)?)),
309 Self::Remote(tx) => Ok(CursorAny::Remote(tx.cursor(dbi).await?)),
310 }
311 }
312
313 pub async fn cursor(&self, db: &DatabaseAny) -> Result<CursorAny<K>> {
314 self.cursor_with_dbi(db.dbi()).await
315 }
316}
317
318impl TransactionAny<RW> {
319 pub async fn begin_nested_txn(&mut self) -> Result<Self> {
320 match self {
321 Self::Local(tx) => Ok(Self::Local(tx.begin_nested_txn()?)),
322 Self::Remote(tx) => Ok(Self::Remote(tx.begin_nested_txn().await?)),
323 }
324 }
325
326 pub async fn clear_db(&self, dbi: u32) -> Result<()> {
327 match self {
328 Self::Local(tx) => Ok(tx.clear_db(dbi)?),
329 Self::Remote(tx) => Ok(tx.clear_db(dbi).await?),
330 }
331 }
332
333 pub async fn put(&self, dbi: u32, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
334 match self {
335 Self::Local(tx) => Ok(tx.put(dbi, key, data, flags)?),
336 Self::Remote(tx) => Ok(tx.put(dbi, key.to_vec(), data.to_vec(), flags).await?),
337 }
338 }
339
340 pub async fn del(&self, dbi: u32, key: &[u8], value: Option<&[u8]>) -> Result<bool> {
341 match self {
342 Self::Local(tx) => Ok(tx.del(dbi, key, value)?),
343 Self::Remote(tx) => Ok(tx.del(dbi, key.to_vec(), value.map(|t| t.to_vec())).await?),
344 }
345 }
346
347 pub async fn create_db(&self, db: Option<&str>, flags: DatabaseFlags) -> Result<DatabaseAny> {
348 match self {
349 Self::Local(tx) => {
350 let tx = tx.clone();
351 let db = db.map(|t| t.to_string());
352 Ok(DatabaseAny::Local(
353 tokio::task::spawn_blocking(move || {
354 tx.create_db(db.as_ref().map(|t| t.as_str()), flags)
355 })
356 .await??,
357 ))
358 }
359 Self::Remote(tx) => Ok(DatabaseAny::Remote(
360 tx.create_db(db.map(|t| t.to_string()), flags).await?,
361 )),
362 }
363 }
364
365 pub async fn commit(self) -> Result<(bool, CommitLatency)> {
366 match self {
367 Self::Local(tx) => Ok(tokio::task::spawn_blocking(move || tx.commit()).await??),
368 Self::Remote(tx) => Ok(tx.commit().await?),
369 }
370 }
371}
372
373#[derive(Debug)]
374pub enum CursorAny<K: TransactionKind> {
375 Local(Cursor<K>),
376 Remote(RemoteCursor<K>),
377}
378
379impl<K: TransactionKind> CursorAny<K> {
380 pub async fn cursor_clone(&self) -> Result<Self> {
381 match self {
382 Self::Local(cur) => Ok(Self::Local(cur.clone())),
383 Self::Remote(cur) => Ok(Self::Remote(cur.cur_clone().await?)),
384 }
385 }
386
387 pub async fn first<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
388 where
389 Key: TableObject,
390 Value: TableObject,
391 {
392 match self {
393 Self::Local(cur) => Ok(cur.first()?),
394 Self::Remote(cur) => Ok(cur.first().await?),
395 }
396 }
397
398 pub async fn first_dup<Value>(&mut self) -> Result<Option<Value>>
399 where
400 Value: TableObject,
401 {
402 match self {
403 Self::Local(cur) => Ok(cur.first_dup()?),
404 Self::Remote(cur) => Ok(cur.first_dup().await?),
405 }
406 }
407
408 pub async fn get_both<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
409 where
410 Value: TableObject,
411 {
412 match self {
413 Self::Local(cur) => Ok(cur.get_both(k, v)?),
414 Self::Remote(cur) => Ok(cur.get_both(k.to_vec(), v.to_vec()).await?),
415 }
416 }
417
418 pub async fn get_both_range<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
419 where
420 Value: TableObject,
421 {
422 match self {
423 Self::Local(cur) => Ok(cur.get_both_range(k, v)?),
424 Self::Remote(cur) => Ok(cur.get_both_range(k.to_vec(), v.to_vec()).await?),
425 }
426 }
427
428 pub async fn get_current<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
429 where
430 Key: TableObject,
431 Value: TableObject,
432 {
433 match self {
434 Self::Local(cur) => Ok(cur.get_current()?),
435 Self::Remote(cur) => Ok(cur.get_current().await?),
436 }
437 }
438
439 pub async fn get_multiple<Value>(&mut self) -> Result<Option<Value>>
440 where
441 Value: TableObject,
442 {
443 match self {
444 Self::Local(cur) => Ok(cur.get_multiple()?),
445 Self::Remote(cur) => Ok(cur.get_multiple().await?),
446 }
447 }
448
449 pub async fn last<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
450 where
451 Key: TableObject,
452 Value: TableObject,
453 {
454 match self {
455 Self::Local(cur) => Ok(cur.last()?),
456 Self::Remote(cur) => Ok(cur.last().await?),
457 }
458 }
459
460 pub async fn last_dup<Value>(&mut self) -> Result<Option<Value>>
461 where
462 Value: TableObject,
463 {
464 match self {
465 Self::Local(cur) => Ok(cur.last_dup()?),
466 Self::Remote(cur) => Ok(cur.last_dup().await?),
467 }
468 }
469
470 pub async fn next<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
471 where
472 Key: TableObject,
473 Value: TableObject,
474 {
475 match self {
476 Self::Local(cur) => Ok(cur.next()?),
477 Self::Remote(cur) => Ok(cur.next().await?),
478 }
479 }
480
481 pub async fn next_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
482 where
483 Key: TableObject,
484 Value: TableObject,
485 {
486 match self {
487 Self::Local(cur) => Ok(cur.next_dup()?),
488 Self::Remote(cur) => Ok(cur.next_dup().await?),
489 }
490 }
491
492 pub async fn next_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
493 where
494 Key: TableObject,
495 Value: TableObject,
496 {
497 match self {
498 Self::Local(cur) => Ok(cur.next_multiple()?),
499 Self::Remote(cur) => Ok(cur.next_multiple().await?),
500 }
501 }
502
503 pub async fn next_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
504 where
505 Key: TableObject,
506 Value: TableObject,
507 {
508 match self {
509 Self::Local(cur) => Ok(cur.next_nodup()?),
510 Self::Remote(cur) => Ok(cur.next_nodup().await?),
511 }
512 }
513
514 pub async fn prev<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
515 where
516 Key: TableObject,
517 Value: TableObject,
518 {
519 match self {
520 Self::Local(cur) => Ok(cur.prev()?),
521 Self::Remote(cur) => Ok(cur.prev().await?),
522 }
523 }
524
525 pub async fn prev_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
526 where
527 Key: TableObject,
528 Value: TableObject,
529 {
530 match self {
531 Self::Local(cur) => Ok(cur.prev_dup()?),
532 Self::Remote(cur) => Ok(cur.prev_dup().await?),
533 }
534 }
535
536 pub async fn prev_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
537 where
538 Key: TableObject,
539 Value: TableObject,
540 {
541 match self {
542 Self::Local(cur) => Ok(cur.prev_nodup()?),
543 Self::Remote(cur) => Ok(cur.prev_nodup().await?),
544 }
545 }
546
547 pub async fn set<Value>(&mut self, key: &[u8]) -> Result<Option<Value>>
548 where
549 Value: TableObject,
550 {
551 match self {
552 Self::Local(cur) => Ok(cur.set(key)?),
553 Self::Remote(cur) => Ok(cur.set(key.to_vec()).await?),
554 }
555 }
556 pub async fn set_key<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
557 where
558 Key: TableObject,
559 Value: TableObject,
560 {
561 match self {
562 Self::Local(cur) => Ok(cur.set_key(key)?),
563 Self::Remote(cur) => Ok(cur.set_key(key.to_vec()).await?),
564 }
565 }
566
567 pub async fn set_range<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
568 where
569 Key: TableObject,
570 Value: TableObject,
571 {
572 match self {
573 Self::Local(cur) => Ok(cur.set_range(key)?),
574 Self::Remote(cur) => Ok(cur.set_range(key.to_vec()).await?),
575 }
576 }
577
578 pub async fn prev_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
579 where
580 Key: TableObject,
581 Value: TableObject,
582 {
583 match self {
584 Self::Local(cur) => Ok(cur.prev_multiple()?),
585 Self::Remote(cur) => Ok(cur.prev_multiple().await?),
586 }
587 }
588
589 pub async fn set_lowerbound<Key, Value>(
590 &mut self,
591 key: &[u8],
592 ) -> Result<Option<(bool, Key, Value)>>
593 where
594 Key: TableObject,
595 Value: TableObject,
596 {
597 match self {
598 Self::Local(cur) => Ok(cur.set_lowerbound(key)?),
599 Self::Remote(cur) => Ok(cur.set_lowerbound(key.to_vec()).await?),
600 }
601 }
602
603 fn iter_to_stream<'cur, Key, Value>(
604 itr: crate::cursor::Iter<'cur, K, Key, Value>,
605 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>
606 where
607 Key: TableObject + Send + 'cur,
608 Value: TableObject + Send + 'cur,
609 {
610 Box::pin(try_stream! {
611 for it in itr {
612 let (k, v) = it?;
613 yield (k, v);
614 }
615 })
616 }
617
618 fn intoiter_to_stream<'cur, Key, Value>(
619 itr: crate::cursor::IntoIter<'cur, K, Key, Value>,
620 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>
621 where
622 Key: TableObject + Send + 'cur,
623 Value: TableObject + Send + 'cur,
624 {
625 Box::pin(try_stream! {
626 for it in itr {
627 let (k, v) = it?;
628 yield (k, v);
629 }
630 })
631 }
632
633 fn iterdup_to_steam<'cur, Key, Value>(
634 iterdup: crate::cursor::IterDup<'cur, K, Key, Value>,
635 ) -> Pin<
636 Box<
637 dyn Stream<
638 Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>>,
639 > + Send
640 + 'cur,
641 >,
642 >
643 where
644 Key: TableObject + Send + 'cur,
645 Value: TableObject + Send + 'cur,
646 {
647 Box::pin(try_stream! {
648 for it in iterdup {
649 let st = Self::intoiter_to_stream(it);
650 yield st;
651 }
652 })
653 }
654
655 pub fn iter<'a, Key, Value>(
656 &'a mut self,
657 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
658 where
659 Key: TableObject + Send + 'a,
660 Value: TableObject + Send + 'a,
661 {
662 match self {
663 Self::Local(cur) => Self::iter_to_stream(cur.iter::<Key, Value>()),
664 Self::Remote(cur) => cur.iter(),
665 }
666 }
667
668 pub fn into_iter_buffered<'a, Key, Value>(
669 self,
670 buffer_config: BufferConfiguration,
671 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
672 where
673 Key: TableObject + Send + 'a,
674 Value: TableObject + Send + 'a,
675 {
676 match self {
677 Self::Local(mut cur) => Box::pin(try_stream! {
678 for it in cur.iter::<Key, Value>() {
679 let (k, v) = it?;
680 yield (k, v);
681 }
682 }),
683 Self::Remote(cur) => cur.into_iter_buffered(buffer_config),
684 }
685 }
686
687 pub fn iter_start<'a, Key, Value>(
688 &'a mut self,
689 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
690 where
691 Key: TableObject + Send + 'a,
692 Value: TableObject + Send + 'a,
693 {
694 match self {
695 Self::Local(cur) => Self::iter_to_stream(cur.iter_start::<Key, Value>()),
696 Self::Remote(cur) => cur.iter_start(),
697 }
698 }
699
700 pub fn into_iter_start_buffered<'a, Key, Value>(
701 self,
702 buffer_config: BufferConfiguration,
703 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
704 where
705 Key: TableObject + Send + 'a,
706 Value: TableObject + Send + 'a,
707 {
708 match self {
709 Self::Local(mut cur) => Box::pin(try_stream! {
710 for it in cur.iter_start::<Key, Value>() {
711 let (k, v) = it?;
712 yield (k, v);
713 }
714 }),
715 Self::Remote(cur) => cur.into_iter_start_buffered(buffer_config),
716 }
717 }
718
719 pub async fn iter_from<'a, Key, Value>(
720 &'a mut self,
721 key: &[u8],
722 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
723 where
724 Key: TableObject + Send + 'a,
725 Value: TableObject + Send + 'a,
726 {
727 Ok(match self {
728 Self::Local(cur) => Self::iter_to_stream(cur.iter_from::<Key, Value>(&key)),
729 Self::Remote(cur) => cur.iter_from(key.to_vec()).await?,
730 })
731 }
732
733 pub async fn into_iter_from_buffered<'a, Key, Value>(
734 self,
735 key: &'a [u8],
736 buffer_config: BufferConfiguration,
737 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
738 where
739 Key: TableObject + Send + 'a,
740 Value: TableObject + Send + 'a,
741 {
742 Ok(match self {
743 Self::Local(mut cur) => Box::pin(try_stream! {
744 for it in cur.iter_from::<Key, Value>(&key) {
745 let (k, v) = it?;
746 yield (k, v);
747 }
748 }),
749 Self::Remote(cur) => {
750 cur.into_iter_from_buffered(key.to_vec(), buffer_config)
751 .await?
752 }
753 })
754 }
755
756 pub fn iter_dup<'a, Key, Value>(
757 &'a mut self,
758 ) -> Pin<
759 Box<
760 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
761 + Send
762 + 'a,
763 >,
764 >
765 where
766 Key: TableObject + Send + 'a,
767 Value: TableObject + Send + 'a,
768 {
769 match self {
770 Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup()),
771 Self::Remote(cur) => cur.iter_dup(),
772 }
773 }
774
775 pub fn into_iter_dup_buffered<'a, Key, Value>(
776 self,
777 buffer_config: BufferConfiguration,
778 ) -> Pin<
779 Box<
780 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
781 + Send
782 + 'a,
783 >,
784 >
785 where
786 Key: TableObject + Send + 'a,
787 Value: TableObject + Send + 'a,
788 {
789 match self {
790 Self::Local(cur) => Box::pin(try_stream! {
791 for it in cur.into_iter_dup() {
792 let st = Self::intoiter_to_stream(it);
793 yield st;
794 }
795 }),
796 Self::Remote(cur) => cur.into_iter_dup_buffered(buffer_config),
797 }
798 }
799
800 pub fn iter_dup_start<'a, Key, Value>(
801 &'a mut self,
802 ) -> Pin<
803 Box<
804 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
805 + Send
806 + 'a,
807 >,
808 >
809 where
810 Key: TableObject + Send + 'a,
811 Value: TableObject + Send + 'a,
812 {
813 match self {
814 Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup_start()),
815 Self::Remote(cur) => cur.iter_dup_start(),
816 }
817 }
818
819 pub fn into_iter_dup_start_buffered<'a, Key, Value>(
820 self,
821 buffer_config: BufferConfiguration,
822 ) -> Pin<
823 Box<
824 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
825 + Send
826 + 'a,
827 >,
828 >
829 where
830 Key: TableObject + Send + 'a,
831 Value: TableObject + Send + 'a,
832 {
833 match self {
834 Self::Local(cur) => Box::pin(try_stream! {
835 for it in cur.into_iter_dup_start() {
836 let st = Self::intoiter_to_stream(it);
837 yield st;
838 }
839 }),
840 Self::Remote(cur) => cur.into_iter_dup_start_buffered(buffer_config),
841 }
842 }
843
844 pub async fn iter_dup_from<'a, Key, Value>(
845 &'a mut self,
846 key: &[u8],
847 ) -> Result<
848 Pin<
849 Box<
850 dyn Stream<
851 Item = Result<
852 Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
853 >,
854 > + Send
855 + 'a,
856 >,
857 >,
858 >
859 where
860 Key: TableObject + Send + 'a,
861 Value: TableObject + Send + 'a,
862 {
863 Ok(match self {
864 Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup_from(&key)),
865 Self::Remote(cur) => cur.iter_dup_from(key.to_vec()).await?,
866 })
867 }
868
869 pub async fn into_iter_dup_from_buffered<'a, Key, Value>(
870 self,
871 key: &'a [u8],
872 buffer_config: BufferConfiguration,
873 ) -> Result<
874 Pin<
875 Box<
876 dyn Stream<
877 Item = Result<
878 Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
879 >,
880 > + Send
881 + 'a,
882 >,
883 >,
884 >
885 where
886 Key: TableObject + Send + 'a,
887 Value: TableObject + Send + 'a,
888 {
889 Ok(match self {
890 Self::Local(mut cur) => Box::pin(try_stream! {
891 for it in cur.into_iter_dup_from(&key) {
892 let st = Self::intoiter_to_stream(it);
893 yield st;
894 }
895 }),
896 Self::Remote(cur) => {
897 cur.into_iter_dup_from_buffered(key.to_vec(), buffer_config)
898 .await?
899 }
900 })
901 }
902
903 pub async fn iter_dup_of<'a, Key, Value>(
904 &'a mut self,
905 key: &[u8],
906 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
907 where
908 Key: TableObject + Send + 'a,
909 Value: TableObject + Send + 'a,
910 {
911 Ok(match self {
912 Self::Local(cur) => Self::iter_to_stream(cur.iter_dup_of(&key)),
913 Self::Remote(cur) => cur.iter_dup_of(key.to_vec()).await?,
914 })
915 }
916
917 pub async fn into_iter_dup_of_buffered<'a, Key, Value>(
918 self,
919 key: &'a [u8],
920 buffer_config: BufferConfiguration,
921 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
922 where
923 Key: TableObject + Send + 'a,
924 Value: TableObject + Send + 'a,
925 {
926 Ok(match self {
927 Self::Local(mut cur) => Box::pin(try_stream! {
928 for it in cur.into_iter_dup_of(&key) {
929 let (k, v) = it?;
930 yield (k, v);
931 }
932 }),
933 Self::Remote(cur) => {
934 cur.into_iter_dup_of_buffered(key.to_vec(), buffer_config)
935 .await?
936 }
937 })
938 }
939}
940
941impl CursorAny<RW> {
942 pub async fn put(&mut self, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
943 match self {
944 Self::Local(cur) => Ok(cur.put(key, data, flags)?),
945 Self::Remote(cur) => Ok(cur.put(key.to_vec(), data.to_vec(), flags).await?),
946 }
947 }
948
949 pub async fn del(&mut self, flags: WriteFlags) -> Result<()> {
950 match self {
951 Self::Local(cur) => Ok(cur.del(flags)?),
952 Self::Remote(cur) => Ok(cur.del(flags).await?),
953 }
954 }
955}