libmdbx_remote/
any.rs

1use std::{
2    collections::HashMap,
3    path::{Path, PathBuf},
4    pin::Pin,
5    time::Duration,
6    usize,
7};
8
9use async_stream::try_stream;
10use tokio_stream::Stream;
11
12use crate::{
13    remote::{
14        BufferConfiguration, ClientError, RemoteCursor, RemoteDatabase, RemoteEnvironment,
15        RemoteTransaction,
16    },
17    service::RemoteMDBXClient,
18    CommitLatency, Cursor, Database, DatabaseFlags, Environment, EnvironmentBuilder,
19    EnvironmentFlags, EnvironmentKind, Info, Mode, Stat, TableObject, Transaction, TransactionKind,
20    WriteFlags, RO, RW,
21};
22
23type Result<T> = std::result::Result<T, ClientError>;
24
25#[derive(Debug, Clone)]
26pub enum EnvironmentAny {
27    Local(Environment),
28    Remote(RemoteEnvironment),
29}
30
31impl EnvironmentAny {
32    pub fn open_local(path: &Path, builder: EnvironmentBuilder) -> Result<Self> {
33        let db = builder.open(path)?;
34
35        Ok(Self::Local(db))
36    }
37
38    pub async fn open_remote(
39        path: &Path,
40        builder: EnvironmentBuilder,
41        remote: String,
42        deadline: Duration,
43    ) -> Result<Self> {
44        let mut transport = tarpc::serde_transport::tcp::connect(
45            remote,
46            tarpc::tokio_serde::formats::Bincode::default,
47        );
48
49        transport.config_mut().max_frame_length(usize::MAX);
50
51        let transport = transport.await?;
52        let client = RemoteMDBXClient::new(tarpc::client::Config::default(), transport);
53        let env =
54            RemoteEnvironment::open_with_builder(path.to_path_buf(), builder, client, deadline)
55                .await?;
56        Ok(Self::Remote(env))
57    }
58
59    pub async fn open_with_defaults(url: &str, defaults: EnvironmentBuilder) -> Result<Self> {
60        let url = url::Url::parse(url).map_err(|e| ClientError::WrongURL(e))?;
61        let mut builder = defaults;
62
63        let args: HashMap<String, String> = url
64            .query_pairs()
65            .into_iter()
66            .map(|(k, v)| (k.into_owned(), v.into_owned()))
67            .collect();
68
69        let mode = if args.contains_key("ro") {
70            Mode::ReadOnly
71        } else if args.contains_key("rw") {
72            Mode::ReadWrite {
73                sync_mode: crate::SyncMode::Durable,
74            }
75        } else {
76            builder.flags.mode.clone()
77        };
78
79        let exclusive = if args.contains_key("exclusive") {
80            true
81        } else {
82            builder.flags.exclusive
83        };
84        let accede = if args.contains_key("accede") {
85            true
86        } else {
87            builder.flags.accede
88        };
89        let no_sub_dir = if args.contains_key("no_sub_dir") {
90            true
91        } else {
92            builder.flags.no_sub_dir
93        };
94        let flags = EnvironmentFlags {
95            mode,
96            exclusive,
97            accede,
98            no_sub_dir,
99            ..Default::default()
100        };
101
102        let max_readers = args
103            .get("max_readers")
104            .map(|t| u64::from_str_radix(&t, 10))
105            .transpose()
106            .map_err(|_| ClientError::ParseError)?
107            .or(builder.max_readers);
108        let max_dbs = args
109            .get("max_dbs")
110            .map(|t| usize::from_str_radix(&t, 10))
111            .transpose()
112            .map_err(|_| ClientError::ParseError)?
113            .or(builder.max_dbs.map(|t| t as usize));
114        let sync_bytes = args
115            .get("sync_bytes")
116            .map(|t| u64::from_str_radix(&t, 10))
117            .transpose()
118            .map_err(|_| ClientError::ParseError)?
119            .or(builder.sync_bytes);
120        let sync_period = args
121            .get("sync_period")
122            .map(|t| u64::from_str_radix(&t, 10))
123            .transpose()
124            .map_err(|_| ClientError::ParseError)?
125            .or(builder.sync_period);
126
127        builder.set_flags(flags);
128        if let Some(max_db) = max_dbs {
129            builder.set_max_dbs(max_db);
130        }
131
132        if let Some(max_readers) = max_readers {
133            builder.set_max_readers(max_readers);
134        }
135
136        if let Some(sync_bytes) = sync_bytes {
137            builder.set_sync_bytes(sync_bytes as usize);
138        }
139
140        if let Some(sync_period) = sync_period {
141            builder.set_sync_period(Duration::from_secs(sync_period));
142        }
143
144        let deadline = args
145            .get("deadline")
146            .map(|t| u64::from_str_radix(&t, 10))
147            .transpose()
148            .map_err(|_| ClientError::ParseError)?
149            .map(|t| Duration::from_secs(t))
150            .unwrap_or(Duration::from_secs(30));
151
152        match url.scheme() {
153            "file" => Self::open_local(&PathBuf::from(url.path()), builder),
154            "mdbx" => {
155                let fpath = PathBuf::from(url.path());
156                if let Some(host) = url.host_str() {
157                    let target = format!("{}:{}", host, url.port().unwrap_or(1899));
158
159                    Self::open_remote(&fpath, builder, target, deadline).await
160                } else {
161                    Self::open_local(&PathBuf::from(url.path()), builder)
162                }
163            }
164            _ => Err(ClientError::ParseError),
165        }
166    }
167
168    pub async fn open(url: &str) -> Result<Self> {
169        let mut defaults = Environment::builder();
170        defaults
171            .set_flags(EnvironmentFlags {
172                mode: Mode::ReadOnly,
173                ..Default::default()
174            })
175            .set_max_dbs(256)
176            .set_max_readers(256);
177        Self::open_with_defaults(url, defaults).await
178    }
179
180    pub async fn begin_ro_txn(&self) -> Result<TransactionAny<RO>> {
181        match self {
182            Self::Local(env) => {
183                let env = env.clone();
184                Ok(TransactionAny::Local(
185                    tokio::task::spawn_blocking(move || env.begin_ro_txn()).await??,
186                ))
187            }
188            Self::Remote(env) => Ok(TransactionAny::Remote(env.begin_ro_txn().await?)),
189        }
190    }
191
192    pub async fn begin_rw_txn(&self) -> Result<TransactionAny<RW>> {
193        match self {
194            Self::Local(env) => {
195                let env = env.clone();
196                Ok(TransactionAny::Local(
197                    tokio::task::spawn_blocking(move || env.begin_rw_txn()).await??,
198                ))
199            }
200            Self::Remote(env) => Ok(TransactionAny::Remote(env.begin_rw_txn().await?)),
201        }
202    }
203
204    pub async fn sync(&self, force: bool) -> Result<bool> {
205        match self {
206            Self::Local(env) => {
207                let env = env.clone();
208                Ok(tokio::task::spawn_blocking(move || env.sync(force)).await??)
209            }
210            Self::Remote(env) => Ok(env.sync(force).await?),
211        }
212    }
213
214    pub async fn stat(&self) -> Result<Stat> {
215        match self {
216            Self::Local(env) => Ok(env.stat()?),
217            Self::Remote(env) => Ok(env.stat().await?),
218        }
219    }
220
221    pub async fn info(&self) -> Result<Info> {
222        match self {
223            Self::Local(env) => Ok(env.info()?),
224            Self::Remote(env) => Ok(env.info().await?),
225        }
226    }
227
228    pub fn env_kind(&self) -> EnvironmentKind {
229        match self {
230            Self::Local(env) => env.env_kind(),
231            Self::Remote(env) => env.env_kind(),
232        }
233    }
234
235    pub fn is_write_map(&self) -> bool {
236        self.env_kind().is_write_map()
237    }
238
239    pub async fn is_read_write(&self) -> Result<bool> {
240        Ok(!self.is_read_only().await?)
241    }
242
243    pub async fn is_read_only(&self) -> Result<bool> {
244        Ok(matches!(self.info().await?.mode(), Mode::ReadOnly))
245    }
246}
247
248#[derive(Debug)]
249pub enum DatabaseAny {
250    Local(Database),
251    Remote(RemoteDatabase),
252}
253
254impl DatabaseAny {
255    pub fn dbi(&self) -> u32 {
256        match self {
257            Self::Local(db) => db.dbi(),
258            Self::Remote(db) => db.dbi(),
259        }
260    }
261}
262
263#[derive(Debug, Clone)]
264pub enum TransactionAny<K: TransactionKind> {
265    Local(Transaction<K>),
266    Remote(RemoteTransaction<K>),
267}
268
269impl<K: TransactionKind> TransactionAny<K> {
270    pub async fn open_db(&self, db: Option<&str>) -> Result<DatabaseAny> {
271        match self {
272            Self::Local(tx) => {
273                let tx = tx.clone();
274                let db = db.map(|t| t.to_string());
275                Ok(DatabaseAny::Local(
276                    tokio::task::spawn_blocking(move || {
277                        tx.open_db(db.as_ref().map(|t| t.as_str()))
278                    })
279                    .await??,
280                ))
281            }
282            Self::Remote(tx) => Ok(DatabaseAny::Remote(
283                tx.open_db(db.map(|t| t.to_string())).await?,
284            )),
285        }
286    }
287
288    pub async fn get<V: TableObject>(&self, dbi: u32, key: &[u8]) -> Result<Option<V>> {
289        match self {
290            Self::Local(tx) => Ok(tx.get::<V>(dbi, key)?),
291            Self::Remote(tx) => Ok(tx.get::<V>(dbi, key.to_vec()).await?),
292        }
293    }
294
295    pub async fn db_stat(&self, db: &DatabaseAny) -> Result<Stat> {
296        self.db_stat_with_dbi(db.dbi()).await
297    }
298
299    pub async fn db_stat_with_dbi(&self, dbi: u32) -> Result<Stat> {
300        match self {
301            Self::Local(tx) => Ok(tx.db_stat_with_dbi(dbi)?),
302            Self::Remote(tx) => Ok(tx.db_stat_with_dbi(dbi).await?),
303        }
304    }
305
306    pub async fn cursor_with_dbi(&self, dbi: u32) -> Result<CursorAny<K>> {
307        match self {
308            Self::Local(tx) => Ok(CursorAny::Local(tx.cursor_with_dbi(dbi)?)),
309            Self::Remote(tx) => Ok(CursorAny::Remote(tx.cursor(dbi).await?)),
310        }
311    }
312
313    pub async fn cursor(&self, db: &DatabaseAny) -> Result<CursorAny<K>> {
314        self.cursor_with_dbi(db.dbi()).await
315    }
316}
317
318impl TransactionAny<RW> {
319    pub async fn begin_nested_txn(&mut self) -> Result<Self> {
320        match self {
321            Self::Local(tx) => Ok(Self::Local(tx.begin_nested_txn()?)),
322            Self::Remote(tx) => Ok(Self::Remote(tx.begin_nested_txn().await?)),
323        }
324    }
325
326    pub async fn clear_db(&self, dbi: u32) -> Result<()> {
327        match self {
328            Self::Local(tx) => Ok(tx.clear_db(dbi)?),
329            Self::Remote(tx) => Ok(tx.clear_db(dbi).await?),
330        }
331    }
332
333    pub async fn put(&self, dbi: u32, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
334        match self {
335            Self::Local(tx) => Ok(tx.put(dbi, key, data, flags)?),
336            Self::Remote(tx) => Ok(tx.put(dbi, key.to_vec(), data.to_vec(), flags).await?),
337        }
338    }
339
340    pub async fn del(&self, dbi: u32, key: &[u8], value: Option<&[u8]>) -> Result<bool> {
341        match self {
342            Self::Local(tx) => Ok(tx.del(dbi, key, value)?),
343            Self::Remote(tx) => Ok(tx.del(dbi, key.to_vec(), value.map(|t| t.to_vec())).await?),
344        }
345    }
346
347    pub async fn create_db(&self, db: Option<&str>, flags: DatabaseFlags) -> Result<DatabaseAny> {
348        match self {
349            Self::Local(tx) => {
350                let tx = tx.clone();
351                let db = db.map(|t| t.to_string());
352                Ok(DatabaseAny::Local(
353                    tokio::task::spawn_blocking(move || {
354                        tx.create_db(db.as_ref().map(|t| t.as_str()), flags)
355                    })
356                    .await??,
357                ))
358            }
359            Self::Remote(tx) => Ok(DatabaseAny::Remote(
360                tx.create_db(db.map(|t| t.to_string()), flags).await?,
361            )),
362        }
363    }
364
365    pub async fn commit(self) -> Result<(bool, CommitLatency)> {
366        match self {
367            Self::Local(tx) => Ok(tokio::task::spawn_blocking(move || tx.commit()).await??),
368            Self::Remote(tx) => Ok(tx.commit().await?),
369        }
370    }
371}
372
373#[derive(Debug)]
374pub enum CursorAny<K: TransactionKind> {
375    Local(Cursor<K>),
376    Remote(RemoteCursor<K>),
377}
378
379impl<K: TransactionKind> CursorAny<K> {
380    pub async fn cursor_clone(&self) -> Result<Self> {
381        match self {
382            Self::Local(cur) => Ok(Self::Local(cur.clone())),
383            Self::Remote(cur) => Ok(Self::Remote(cur.cur_clone().await?)),
384        }
385    }
386
387    pub async fn first<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
388    where
389        Key: TableObject,
390        Value: TableObject,
391    {
392        match self {
393            Self::Local(cur) => Ok(cur.first()?),
394            Self::Remote(cur) => Ok(cur.first().await?),
395        }
396    }
397
398    pub async fn first_dup<Value>(&mut self) -> Result<Option<Value>>
399    where
400        Value: TableObject,
401    {
402        match self {
403            Self::Local(cur) => Ok(cur.first_dup()?),
404            Self::Remote(cur) => Ok(cur.first_dup().await?),
405        }
406    }
407
408    pub async fn get_both<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
409    where
410        Value: TableObject,
411    {
412        match self {
413            Self::Local(cur) => Ok(cur.get_both(k, v)?),
414            Self::Remote(cur) => Ok(cur.get_both(k.to_vec(), v.to_vec()).await?),
415        }
416    }
417
418    pub async fn get_both_range<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
419    where
420        Value: TableObject,
421    {
422        match self {
423            Self::Local(cur) => Ok(cur.get_both_range(k, v)?),
424            Self::Remote(cur) => Ok(cur.get_both_range(k.to_vec(), v.to_vec()).await?),
425        }
426    }
427
428    pub async fn get_current<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
429    where
430        Key: TableObject,
431        Value: TableObject,
432    {
433        match self {
434            Self::Local(cur) => Ok(cur.get_current()?),
435            Self::Remote(cur) => Ok(cur.get_current().await?),
436        }
437    }
438
439    pub async fn get_multiple<Value>(&mut self) -> Result<Option<Value>>
440    where
441        Value: TableObject,
442    {
443        match self {
444            Self::Local(cur) => Ok(cur.get_multiple()?),
445            Self::Remote(cur) => Ok(cur.get_multiple().await?),
446        }
447    }
448
449    pub async fn last<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
450    where
451        Key: TableObject,
452        Value: TableObject,
453    {
454        match self {
455            Self::Local(cur) => Ok(cur.last()?),
456            Self::Remote(cur) => Ok(cur.last().await?),
457        }
458    }
459
460    pub async fn last_dup<Value>(&mut self) -> Result<Option<Value>>
461    where
462        Value: TableObject,
463    {
464        match self {
465            Self::Local(cur) => Ok(cur.last_dup()?),
466            Self::Remote(cur) => Ok(cur.last_dup().await?),
467        }
468    }
469
470    pub async fn next<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
471    where
472        Key: TableObject,
473        Value: TableObject,
474    {
475        match self {
476            Self::Local(cur) => Ok(cur.next()?),
477            Self::Remote(cur) => Ok(cur.next().await?),
478        }
479    }
480
481    pub async fn next_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
482    where
483        Key: TableObject,
484        Value: TableObject,
485    {
486        match self {
487            Self::Local(cur) => Ok(cur.next_dup()?),
488            Self::Remote(cur) => Ok(cur.next_dup().await?),
489        }
490    }
491
492    pub async fn next_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
493    where
494        Key: TableObject,
495        Value: TableObject,
496    {
497        match self {
498            Self::Local(cur) => Ok(cur.next_multiple()?),
499            Self::Remote(cur) => Ok(cur.next_multiple().await?),
500        }
501    }
502
503    pub async fn next_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
504    where
505        Key: TableObject,
506        Value: TableObject,
507    {
508        match self {
509            Self::Local(cur) => Ok(cur.next_nodup()?),
510            Self::Remote(cur) => Ok(cur.next_nodup().await?),
511        }
512    }
513
514    pub async fn prev<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
515    where
516        Key: TableObject,
517        Value: TableObject,
518    {
519        match self {
520            Self::Local(cur) => Ok(cur.prev()?),
521            Self::Remote(cur) => Ok(cur.prev().await?),
522        }
523    }
524
525    pub async fn prev_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
526    where
527        Key: TableObject,
528        Value: TableObject,
529    {
530        match self {
531            Self::Local(cur) => Ok(cur.prev_dup()?),
532            Self::Remote(cur) => Ok(cur.prev_dup().await?),
533        }
534    }
535
536    pub async fn prev_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
537    where
538        Key: TableObject,
539        Value: TableObject,
540    {
541        match self {
542            Self::Local(cur) => Ok(cur.prev_nodup()?),
543            Self::Remote(cur) => Ok(cur.prev_nodup().await?),
544        }
545    }
546
547    pub async fn set<Value>(&mut self, key: &[u8]) -> Result<Option<Value>>
548    where
549        Value: TableObject,
550    {
551        match self {
552            Self::Local(cur) => Ok(cur.set(key)?),
553            Self::Remote(cur) => Ok(cur.set(key.to_vec()).await?),
554        }
555    }
556    pub async fn set_key<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
557    where
558        Key: TableObject,
559        Value: TableObject,
560    {
561        match self {
562            Self::Local(cur) => Ok(cur.set_key(key)?),
563            Self::Remote(cur) => Ok(cur.set_key(key.to_vec()).await?),
564        }
565    }
566
567    pub async fn set_range<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
568    where
569        Key: TableObject,
570        Value: TableObject,
571    {
572        match self {
573            Self::Local(cur) => Ok(cur.set_range(key)?),
574            Self::Remote(cur) => Ok(cur.set_range(key.to_vec()).await?),
575        }
576    }
577
578    pub async fn prev_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
579    where
580        Key: TableObject,
581        Value: TableObject,
582    {
583        match self {
584            Self::Local(cur) => Ok(cur.prev_multiple()?),
585            Self::Remote(cur) => Ok(cur.prev_multiple().await?),
586        }
587    }
588
589    pub async fn set_lowerbound<Key, Value>(
590        &mut self,
591        key: &[u8],
592    ) -> Result<Option<(bool, Key, Value)>>
593    where
594        Key: TableObject,
595        Value: TableObject,
596    {
597        match self {
598            Self::Local(cur) => Ok(cur.set_lowerbound(key)?),
599            Self::Remote(cur) => Ok(cur.set_lowerbound(key.to_vec()).await?),
600        }
601    }
602
603    fn iter_to_stream<'cur, Key, Value>(
604        itr: crate::cursor::Iter<'cur, K, Key, Value>,
605    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>
606    where
607        Key: TableObject + Send + 'cur,
608        Value: TableObject + Send + 'cur,
609    {
610        Box::pin(try_stream! {
611            for it in itr {
612                let (k, v) = it?;
613                yield (k, v);
614            }
615        })
616    }
617
618    fn intoiter_to_stream<'cur, Key, Value>(
619        itr: crate::cursor::IntoIter<'cur, K, Key, Value>,
620    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>
621    where
622        Key: TableObject + Send + 'cur,
623        Value: TableObject + Send + 'cur,
624    {
625        Box::pin(try_stream! {
626            for it in itr {
627                let (k, v) = it?;
628                yield (k, v);
629            }
630        })
631    }
632
633    fn iterdup_to_steam<'cur, Key, Value>(
634        iterdup: crate::cursor::IterDup<'cur, K, Key, Value>,
635    ) -> Pin<
636        Box<
637            dyn Stream<
638                    Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>>,
639                > + Send
640                + 'cur,
641        >,
642    >
643    where
644        Key: TableObject + Send + 'cur,
645        Value: TableObject + Send + 'cur,
646    {
647        Box::pin(try_stream! {
648            for it in iterdup {
649                let st = Self::intoiter_to_stream(it);
650                yield st;
651            }
652        })
653    }
654
655    pub fn iter<'a, Key, Value>(
656        &'a mut self,
657    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
658    where
659        Key: TableObject + Send + 'a,
660        Value: TableObject + Send + 'a,
661    {
662        match self {
663            Self::Local(cur) => Self::iter_to_stream(cur.iter::<Key, Value>()),
664            Self::Remote(cur) => cur.iter(),
665        }
666    }
667
668    pub fn into_iter_buffered<'a, Key, Value>(
669        self,
670        buffer_config: BufferConfiguration,
671    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
672    where
673        Key: TableObject + Send + 'a,
674        Value: TableObject + Send + 'a,
675    {
676        match self {
677            Self::Local(mut cur) => Box::pin(try_stream! {
678                for it in cur.iter::<Key, Value>() {
679                    let (k, v) = it?;
680                    yield (k, v);
681                }
682            }),
683            Self::Remote(cur) => cur.into_iter_buffered(buffer_config),
684        }
685    }
686
687    pub fn iter_start<'a, Key, Value>(
688        &'a mut self,
689    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
690    where
691        Key: TableObject + Send + 'a,
692        Value: TableObject + Send + 'a,
693    {
694        match self {
695            Self::Local(cur) => Self::iter_to_stream(cur.iter_start::<Key, Value>()),
696            Self::Remote(cur) => cur.iter_start(),
697        }
698    }
699
700    pub fn into_iter_start_buffered<'a, Key, Value>(
701        self,
702        buffer_config: BufferConfiguration,
703    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
704    where
705        Key: TableObject + Send + 'a,
706        Value: TableObject + Send + 'a,
707    {
708        match self {
709            Self::Local(mut cur) => Box::pin(try_stream! {
710                for it in cur.iter_start::<Key, Value>() {
711                    let (k, v) = it?;
712                    yield (k, v);
713                }
714            }),
715            Self::Remote(cur) => cur.into_iter_start_buffered(buffer_config),
716        }
717    }
718
719    pub async fn iter_from<'a, Key, Value>(
720        &'a mut self,
721        key: &[u8],
722    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
723    where
724        Key: TableObject + Send + 'a,
725        Value: TableObject + Send + 'a,
726    {
727        Ok(match self {
728            Self::Local(cur) => Self::iter_to_stream(cur.iter_from::<Key, Value>(&key)),
729            Self::Remote(cur) => cur.iter_from(key.to_vec()).await?,
730        })
731    }
732
733    pub async fn into_iter_from_buffered<'a, Key, Value>(
734        self,
735        key: &'a [u8],
736        buffer_config: BufferConfiguration,
737    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
738    where
739        Key: TableObject + Send + 'a,
740        Value: TableObject + Send + 'a,
741    {
742        Ok(match self {
743            Self::Local(mut cur) => Box::pin(try_stream! {
744                for it in cur.iter_from::<Key, Value>(&key) {
745                    let (k, v) = it?;
746                    yield (k, v);
747                }
748            }),
749            Self::Remote(cur) => {
750                cur.into_iter_from_buffered(key.to_vec(), buffer_config)
751                    .await?
752            }
753        })
754    }
755
756    pub fn iter_dup<'a, Key, Value>(
757        &'a mut self,
758    ) -> Pin<
759        Box<
760            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
761                + Send
762                + 'a,
763        >,
764    >
765    where
766        Key: TableObject + Send + 'a,
767        Value: TableObject + Send + 'a,
768    {
769        match self {
770            Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup()),
771            Self::Remote(cur) => cur.iter_dup(),
772        }
773    }
774
775    pub fn into_iter_dup_buffered<'a, Key, Value>(
776        self,
777        buffer_config: BufferConfiguration,
778    ) -> Pin<
779        Box<
780            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
781                + Send
782                + 'a,
783        >,
784    >
785    where
786        Key: TableObject + Send + 'a,
787        Value: TableObject + Send + 'a,
788    {
789        match self {
790            Self::Local(cur) => Box::pin(try_stream! {
791                for it in cur.into_iter_dup() {
792                    let st = Self::intoiter_to_stream(it);
793                    yield st;
794                }
795            }),
796            Self::Remote(cur) => cur.into_iter_dup_buffered(buffer_config),
797        }
798    }
799
800    pub fn iter_dup_start<'a, Key, Value>(
801        &'a mut self,
802    ) -> Pin<
803        Box<
804            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
805                + Send
806                + 'a,
807        >,
808    >
809    where
810        Key: TableObject + Send + 'a,
811        Value: TableObject + Send + 'a,
812    {
813        match self {
814            Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup_start()),
815            Self::Remote(cur) => cur.iter_dup_start(),
816        }
817    }
818
819    pub fn into_iter_dup_start_buffered<'a, Key, Value>(
820        self,
821        buffer_config: BufferConfiguration,
822    ) -> Pin<
823        Box<
824            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
825                + Send
826                + 'a,
827        >,
828    >
829    where
830        Key: TableObject + Send + 'a,
831        Value: TableObject + Send + 'a,
832    {
833        match self {
834            Self::Local(cur) => Box::pin(try_stream! {
835                for it in cur.into_iter_dup_start() {
836                    let st = Self::intoiter_to_stream(it);
837                    yield st;
838                }
839            }),
840            Self::Remote(cur) => cur.into_iter_dup_start_buffered(buffer_config),
841        }
842    }
843
844    pub async fn iter_dup_from<'a, Key, Value>(
845        &'a mut self,
846        key: &[u8],
847    ) -> Result<
848        Pin<
849            Box<
850                dyn Stream<
851                        Item = Result<
852                            Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
853                        >,
854                    > + Send
855                    + 'a,
856            >,
857        >,
858    >
859    where
860        Key: TableObject + Send + 'a,
861        Value: TableObject + Send + 'a,
862    {
863        Ok(match self {
864            Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup_from(&key)),
865            Self::Remote(cur) => cur.iter_dup_from(key.to_vec()).await?,
866        })
867    }
868
869    pub async fn into_iter_dup_from_buffered<'a, Key, Value>(
870        self,
871        key: &'a [u8],
872        buffer_config: BufferConfiguration,
873    ) -> Result<
874        Pin<
875            Box<
876                dyn Stream<
877                        Item = Result<
878                            Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
879                        >,
880                    > + Send
881                    + 'a,
882            >,
883        >,
884    >
885    where
886        Key: TableObject + Send + 'a,
887        Value: TableObject + Send + 'a,
888    {
889        Ok(match self {
890            Self::Local(mut cur) => Box::pin(try_stream! {
891                for it in cur.into_iter_dup_from(&key) {
892                    let st = Self::intoiter_to_stream(it);
893                    yield st;
894                }
895            }),
896            Self::Remote(cur) => {
897                cur.into_iter_dup_from_buffered(key.to_vec(), buffer_config)
898                    .await?
899            }
900        })
901    }
902
903    pub async fn iter_dup_of<'a, Key, Value>(
904        &'a mut self,
905        key: &[u8],
906    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
907    where
908        Key: TableObject + Send + 'a,
909        Value: TableObject + Send + 'a,
910    {
911        Ok(match self {
912            Self::Local(cur) => Self::iter_to_stream(cur.iter_dup_of(&key)),
913            Self::Remote(cur) => cur.iter_dup_of(key.to_vec()).await?,
914        })
915    }
916
917    pub async fn into_iter_dup_of_buffered<'a, Key, Value>(
918        self,
919        key: &'a [u8],
920        buffer_config: BufferConfiguration,
921    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
922    where
923        Key: TableObject + Send + 'a,
924        Value: TableObject + Send + 'a,
925    {
926        Ok(match self {
927            Self::Local(mut cur) => Box::pin(try_stream! {
928                for it in cur.into_iter_dup_of(&key) {
929                    let (k, v) = it?;
930                    yield (k, v);
931                }
932            }),
933            Self::Remote(cur) => {
934                cur.into_iter_dup_of_buffered(key.to_vec(), buffer_config)
935                    .await?
936            }
937        })
938    }
939}
940
941impl CursorAny<RW> {
942    pub async fn put(&mut self, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
943        match self {
944            Self::Local(cur) => Ok(cur.put(key, data, flags)?),
945            Self::Remote(cur) => Ok(cur.put(key.to_vec(), data.to_vec(), flags).await?),
946        }
947    }
948
949    pub async fn del(&mut self, flags: WriteFlags) -> Result<()> {
950        match self {
951            Self::Local(cur) => Ok(cur.del(flags)?),
952            Self::Remote(cur) => Ok(cur.del(flags).await?),
953        }
954    }
955}