libmdbx_remote/
any.rs

1use std::{
2    collections::HashMap,
3    path::{Path, PathBuf},
4    pin::Pin,
5    time::Duration,
6    usize,
7};
8
9use async_stream::try_stream;
10use tokio_stream::Stream;
11
12use crate::{
13    remote::{
14        BufferConfiguration, ClientError, RemoteCursor, RemoteDatabase, RemoteEnvironment,
15        RemoteTransaction,
16    },
17    service::RemoteMDBXClient,
18    CommitLatency, Cursor, Database, DatabaseFlags, Environment, EnvironmentBuilder,
19    EnvironmentFlags, EnvironmentKind, Info, Mode, Stat, TableObject, Transaction, TransactionKind,
20    WriteFlags, RO, RW,
21};
22
23type Result<T> = std::result::Result<T, ClientError>;
24
25#[derive(Debug, Clone)]
26pub enum EnvironmentAny {
27    Local(Environment),
28    Remote(RemoteEnvironment),
29}
30
31impl EnvironmentAny {
32    pub fn open_local(path: &Path, builder: EnvironmentBuilder) -> Result<Self> {
33        let db = builder.open(path)?;
34
35        Ok(Self::Local(db))
36    }
37
38    pub async fn open_remote(
39        path: &Path,
40        builder: EnvironmentBuilder,
41        remote: String,
42        deadline: Duration,
43    ) -> Result<Self> {
44        let mut transport = tarpc::serde_transport::tcp::connect(
45            remote,
46            tarpc::tokio_serde::formats::Bincode::default,
47        );
48
49        transport.config_mut().max_frame_length(usize::MAX);
50
51        let transport = transport.await?;
52        let client = RemoteMDBXClient::new(tarpc::client::Config::default(), transport);
53        let env =
54            RemoteEnvironment::open_with_builder(path.to_path_buf(), builder, client, deadline)
55                .await?;
56        Ok(Self::Remote(env))
57    }
58
59    pub async fn open_with_defaults(url: &str, defaults: EnvironmentBuilder) -> Result<Self> {
60        if url.starts_with("mdbx") || url.starts_with("file") {
61            Self::open_url_with_defaults(url, defaults).await
62        } else {
63            Self::open_local(&PathBuf::from(url), defaults)
64        }
65    }
66
67    pub async fn open_url_with_defaults(url: &str, defaults: EnvironmentBuilder) -> Result<Self> {
68        let url = url::Url::parse(url).map_err(|e| ClientError::WrongURL(e))?;
69        let mut builder = defaults;
70
71        let args: HashMap<String, String> = url
72            .query_pairs()
73            .into_iter()
74            .map(|(k, v)| (k.into_owned(), v.into_owned()))
75            .collect();
76
77        let mode = if args.contains_key("ro") {
78            Mode::ReadOnly
79        } else if args.contains_key("rw") {
80            Mode::ReadWrite {
81                sync_mode: crate::SyncMode::Durable,
82            }
83        } else {
84            builder.flags.mode.clone()
85        };
86
87        let exclusive = if args.contains_key("exclusive") {
88            true
89        } else {
90            builder.flags.exclusive
91        };
92        let accede = if args.contains_key("accede") {
93            true
94        } else {
95            builder.flags.accede
96        };
97        let no_sub_dir = if args.contains_key("no_sub_dir") {
98            true
99        } else {
100            builder.flags.no_sub_dir
101        };
102        let flags = EnvironmentFlags {
103            mode,
104            exclusive,
105            accede,
106            no_sub_dir,
107            ..Default::default()
108        };
109
110        let max_readers = args
111            .get("max_readers")
112            .map(|t| u64::from_str_radix(&t, 10))
113            .transpose()
114            .map_err(|_| ClientError::ParseError)?
115            .or(builder.max_readers);
116        let max_dbs = args
117            .get("max_dbs")
118            .map(|t| usize::from_str_radix(&t, 10))
119            .transpose()
120            .map_err(|_| ClientError::ParseError)?
121            .or(builder.max_dbs.map(|t| t as usize));
122        let sync_bytes = args
123            .get("sync_bytes")
124            .map(|t| u64::from_str_radix(&t, 10))
125            .transpose()
126            .map_err(|_| ClientError::ParseError)?
127            .or(builder.sync_bytes);
128        let sync_period = args
129            .get("sync_period")
130            .map(|t| u64::from_str_radix(&t, 10))
131            .transpose()
132            .map_err(|_| ClientError::ParseError)?
133            .or(builder.sync_period);
134
135        builder.set_flags(flags);
136        if let Some(max_db) = max_dbs {
137            builder.set_max_dbs(max_db);
138        }
139
140        if let Some(max_readers) = max_readers {
141            builder.set_max_readers(max_readers);
142        }
143
144        if let Some(sync_bytes) = sync_bytes {
145            builder.set_sync_bytes(sync_bytes as usize);
146        }
147
148        if let Some(sync_period) = sync_period {
149            builder.set_sync_period(Duration::from_secs(sync_period));
150        }
151
152        let deadline = args
153            .get("deadline")
154            .map(|t| u64::from_str_radix(&t, 10))
155            .transpose()
156            .map_err(|_| ClientError::ParseError)?
157            .map(|t| Duration::from_secs(t))
158            .unwrap_or(Duration::from_secs(30));
159
160        match url.scheme() {
161            "file" => Self::open_local(&PathBuf::from(url.path()), builder),
162            "mdbx" => {
163                let fpath = PathBuf::from(url.path());
164                if let Some(host) = url.host_str() {
165                    let target = format!("{}:{}", host, url.port().unwrap_or(1899));
166
167                    Self::open_remote(&fpath, builder, target, deadline).await
168                } else {
169                    Self::open_local(&PathBuf::from(url.path()), builder)
170                }
171            }
172            _ => Err(ClientError::ParseError),
173        }
174    }
175
176    pub async fn open(url: &str) -> Result<Self> {
177        let mut defaults = Environment::builder();
178        defaults
179            .set_flags(EnvironmentFlags {
180                mode: Mode::ReadOnly,
181                ..Default::default()
182            })
183            .set_max_dbs(256)
184            .set_max_readers(256);
185        Self::open_with_defaults(url, defaults).await
186    }
187
188    pub async fn begin_ro_txn(&self) -> Result<TransactionAny<RO>> {
189        match self {
190            Self::Local(env) => {
191                let env = env.clone();
192                Ok(TransactionAny::Local(
193                    tokio::task::spawn_blocking(move || env.begin_ro_txn()).await??,
194                ))
195            }
196            Self::Remote(env) => Ok(TransactionAny::Remote(env.begin_ro_txn().await?)),
197        }
198    }
199
200    pub async fn begin_rw_txn(&self) -> Result<TransactionAny<RW>> {
201        match self {
202            Self::Local(env) => {
203                let env = env.clone();
204                Ok(TransactionAny::Local(
205                    tokio::task::spawn_blocking(move || env.begin_rw_txn()).await??,
206                ))
207            }
208            Self::Remote(env) => Ok(TransactionAny::Remote(env.begin_rw_txn().await?)),
209        }
210    }
211
212    pub async fn sync(&self, force: bool) -> Result<bool> {
213        match self {
214            Self::Local(env) => {
215                let env = env.clone();
216                Ok(tokio::task::spawn_blocking(move || env.sync(force)).await??)
217            }
218            Self::Remote(env) => Ok(env.sync(force).await?),
219        }
220    }
221
222    pub async fn stat(&self) -> Result<Stat> {
223        match self {
224            Self::Local(env) => Ok(env.stat()?),
225            Self::Remote(env) => Ok(env.stat().await?),
226        }
227    }
228
229    pub async fn info(&self) -> Result<Info> {
230        match self {
231            Self::Local(env) => Ok(env.info()?),
232            Self::Remote(env) => Ok(env.info().await?),
233        }
234    }
235
236    pub fn env_kind(&self) -> EnvironmentKind {
237        match self {
238            Self::Local(env) => env.env_kind(),
239            Self::Remote(env) => env.env_kind(),
240        }
241    }
242
243    pub fn is_write_map(&self) -> bool {
244        self.env_kind().is_write_map()
245    }
246
247    pub async fn is_read_write(&self) -> Result<bool> {
248        Ok(!self.is_read_only().await?)
249    }
250
251    pub async fn is_read_only(&self) -> Result<bool> {
252        Ok(matches!(self.info().await?.mode(), Mode::ReadOnly))
253    }
254}
255
256#[derive(Debug)]
257pub enum DatabaseAny {
258    Local(Database),
259    Remote(RemoteDatabase),
260}
261
262impl DatabaseAny {
263    pub fn dbi(&self) -> u32 {
264        match self {
265            Self::Local(db) => db.dbi(),
266            Self::Remote(db) => db.dbi(),
267        }
268    }
269}
270
271#[derive(Debug, Clone)]
272pub enum TransactionAny<K: TransactionKind> {
273    Local(Transaction<K>),
274    Remote(RemoteTransaction<K>),
275}
276
277impl<K: TransactionKind> TransactionAny<K> {
278    pub async fn open_db(&self, db: Option<&str>) -> Result<DatabaseAny> {
279        match self {
280            Self::Local(tx) => {
281                let tx = tx.clone();
282                let db = db.map(|t| t.to_string());
283                Ok(DatabaseAny::Local(
284                    tokio::task::spawn_blocking(move || {
285                        tx.open_db(db.as_ref().map(|t| t.as_str()))
286                    })
287                    .await??,
288                ))
289            }
290            Self::Remote(tx) => Ok(DatabaseAny::Remote(
291                tx.open_db(db.map(|t| t.to_string())).await?,
292            )),
293        }
294    }
295
296    pub async fn get<V: TableObject>(&self, dbi: u32, key: &[u8]) -> Result<Option<V>> {
297        match self {
298            Self::Local(tx) => Ok(tx.get::<V>(dbi, key)?),
299            Self::Remote(tx) => Ok(tx.get::<V>(dbi, key.to_vec()).await?),
300        }
301    }
302
303    pub async fn db_stat(&self, db: &DatabaseAny) -> Result<Stat> {
304        self.db_stat_with_dbi(db.dbi()).await
305    }
306
307    pub async fn db_stat_with_dbi(&self, dbi: u32) -> Result<Stat> {
308        match self {
309            Self::Local(tx) => Ok(tx.db_stat_with_dbi(dbi)?),
310            Self::Remote(tx) => Ok(tx.db_stat_with_dbi(dbi).await?),
311        }
312    }
313
314    pub async fn cursor_with_dbi(&self, dbi: u32) -> Result<CursorAny<K>> {
315        match self {
316            Self::Local(tx) => Ok(CursorAny::Local(tx.cursor_with_dbi(dbi)?)),
317            Self::Remote(tx) => Ok(CursorAny::Remote(tx.cursor(dbi).await?)),
318        }
319    }
320
321    pub async fn cursor(&self, db: &DatabaseAny) -> Result<CursorAny<K>> {
322        self.cursor_with_dbi(db.dbi()).await
323    }
324}
325
326impl TransactionAny<RW> {
327    pub async fn begin_nested_txn(&mut self) -> Result<Self> {
328        match self {
329            Self::Local(tx) => Ok(Self::Local(tx.begin_nested_txn()?)),
330            Self::Remote(tx) => Ok(Self::Remote(tx.begin_nested_txn().await?)),
331        }
332    }
333
334    pub async fn clear_db(&self, dbi: u32) -> Result<()> {
335        match self {
336            Self::Local(tx) => Ok(tx.clear_db(dbi)?),
337            Self::Remote(tx) => Ok(tx.clear_db(dbi).await?),
338        }
339    }
340
341    pub async fn put(&self, dbi: u32, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
342        match self {
343            Self::Local(tx) => Ok(tx.put(dbi, key, data, flags)?),
344            Self::Remote(tx) => Ok(tx.put(dbi, key.to_vec(), data.to_vec(), flags).await?),
345        }
346    }
347
348    pub async fn del(&self, dbi: u32, key: &[u8], value: Option<&[u8]>) -> Result<bool> {
349        match self {
350            Self::Local(tx) => Ok(tx.del(dbi, key, value)?),
351            Self::Remote(tx) => Ok(tx.del(dbi, key.to_vec(), value.map(|t| t.to_vec())).await?),
352        }
353    }
354
355    pub async fn create_db(&self, db: Option<&str>, flags: DatabaseFlags) -> Result<DatabaseAny> {
356        match self {
357            Self::Local(tx) => {
358                let tx = tx.clone();
359                let db = db.map(|t| t.to_string());
360                Ok(DatabaseAny::Local(
361                    tokio::task::spawn_blocking(move || {
362                        tx.create_db(db.as_ref().map(|t| t.as_str()), flags)
363                    })
364                    .await??,
365                ))
366            }
367            Self::Remote(tx) => Ok(DatabaseAny::Remote(
368                tx.create_db(db.map(|t| t.to_string()), flags).await?,
369            )),
370        }
371    }
372
373    pub async fn commit(self) -> Result<(bool, CommitLatency)> {
374        match self {
375            Self::Local(tx) => Ok(tokio::task::spawn_blocking(move || tx.commit()).await??),
376            Self::Remote(tx) => Ok(tx.commit().await?),
377        }
378    }
379}
380
381#[derive(Debug)]
382pub enum CursorAny<K: TransactionKind> {
383    Local(Cursor<K>),
384    Remote(RemoteCursor<K>),
385}
386
387impl<K: TransactionKind> CursorAny<K> {
388    pub async fn cursor_clone(&self) -> Result<Self> {
389        match self {
390            Self::Local(cur) => Ok(Self::Local(cur.clone())),
391            Self::Remote(cur) => Ok(Self::Remote(cur.cur_clone().await?)),
392        }
393    }
394
395    pub async fn first<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
396    where
397        Key: TableObject,
398        Value: TableObject,
399    {
400        match self {
401            Self::Local(cur) => Ok(cur.first()?),
402            Self::Remote(cur) => Ok(cur.first().await?),
403        }
404    }
405
406    pub async fn first_dup<Value>(&mut self) -> Result<Option<Value>>
407    where
408        Value: TableObject,
409    {
410        match self {
411            Self::Local(cur) => Ok(cur.first_dup()?),
412            Self::Remote(cur) => Ok(cur.first_dup().await?),
413        }
414    }
415
416    pub async fn get_both<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
417    where
418        Value: TableObject,
419    {
420        match self {
421            Self::Local(cur) => Ok(cur.get_both(k, v)?),
422            Self::Remote(cur) => Ok(cur.get_both(k.to_vec(), v.to_vec()).await?),
423        }
424    }
425
426    pub async fn get_both_range<Value>(&mut self, k: &[u8], v: &[u8]) -> Result<Option<Value>>
427    where
428        Value: TableObject,
429    {
430        match self {
431            Self::Local(cur) => Ok(cur.get_both_range(k, v)?),
432            Self::Remote(cur) => Ok(cur.get_both_range(k.to_vec(), v.to_vec()).await?),
433        }
434    }
435
436    pub async fn get_current<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
437    where
438        Key: TableObject,
439        Value: TableObject,
440    {
441        match self {
442            Self::Local(cur) => Ok(cur.get_current()?),
443            Self::Remote(cur) => Ok(cur.get_current().await?),
444        }
445    }
446
447    pub async fn get_multiple<Value>(&mut self) -> Result<Option<Value>>
448    where
449        Value: TableObject,
450    {
451        match self {
452            Self::Local(cur) => Ok(cur.get_multiple()?),
453            Self::Remote(cur) => Ok(cur.get_multiple().await?),
454        }
455    }
456
457    pub async fn last<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
458    where
459        Key: TableObject,
460        Value: TableObject,
461    {
462        match self {
463            Self::Local(cur) => Ok(cur.last()?),
464            Self::Remote(cur) => Ok(cur.last().await?),
465        }
466    }
467
468    pub async fn last_dup<Value>(&mut self) -> Result<Option<Value>>
469    where
470        Value: TableObject,
471    {
472        match self {
473            Self::Local(cur) => Ok(cur.last_dup()?),
474            Self::Remote(cur) => Ok(cur.last_dup().await?),
475        }
476    }
477
478    pub async fn next<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
479    where
480        Key: TableObject,
481        Value: TableObject,
482    {
483        match self {
484            Self::Local(cur) => Ok(cur.next()?),
485            Self::Remote(cur) => Ok(cur.next().await?),
486        }
487    }
488
489    pub async fn next_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
490    where
491        Key: TableObject,
492        Value: TableObject,
493    {
494        match self {
495            Self::Local(cur) => Ok(cur.next_dup()?),
496            Self::Remote(cur) => Ok(cur.next_dup().await?),
497        }
498    }
499
500    pub async fn next_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
501    where
502        Key: TableObject,
503        Value: TableObject,
504    {
505        match self {
506            Self::Local(cur) => Ok(cur.next_multiple()?),
507            Self::Remote(cur) => Ok(cur.next_multiple().await?),
508        }
509    }
510
511    pub async fn next_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
512    where
513        Key: TableObject,
514        Value: TableObject,
515    {
516        match self {
517            Self::Local(cur) => Ok(cur.next_nodup()?),
518            Self::Remote(cur) => Ok(cur.next_nodup().await?),
519        }
520    }
521
522    pub async fn prev<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
523    where
524        Key: TableObject,
525        Value: TableObject,
526    {
527        match self {
528            Self::Local(cur) => Ok(cur.prev()?),
529            Self::Remote(cur) => Ok(cur.prev().await?),
530        }
531    }
532
533    pub async fn prev_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
534    where
535        Key: TableObject,
536        Value: TableObject,
537    {
538        match self {
539            Self::Local(cur) => Ok(cur.prev_dup()?),
540            Self::Remote(cur) => Ok(cur.prev_dup().await?),
541        }
542    }
543
544    pub async fn prev_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
545    where
546        Key: TableObject,
547        Value: TableObject,
548    {
549        match self {
550            Self::Local(cur) => Ok(cur.prev_nodup()?),
551            Self::Remote(cur) => Ok(cur.prev_nodup().await?),
552        }
553    }
554
555    pub async fn set<Value>(&mut self, key: &[u8]) -> Result<Option<Value>>
556    where
557        Value: TableObject,
558    {
559        match self {
560            Self::Local(cur) => Ok(cur.set(key)?),
561            Self::Remote(cur) => Ok(cur.set(key.to_vec()).await?),
562        }
563    }
564    pub async fn set_key<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
565    where
566        Key: TableObject,
567        Value: TableObject,
568    {
569        match self {
570            Self::Local(cur) => Ok(cur.set_key(key)?),
571            Self::Remote(cur) => Ok(cur.set_key(key.to_vec()).await?),
572        }
573    }
574
575    pub async fn set_range<Key, Value>(&mut self, key: &[u8]) -> Result<Option<(Key, Value)>>
576    where
577        Key: TableObject,
578        Value: TableObject,
579    {
580        match self {
581            Self::Local(cur) => Ok(cur.set_range(key)?),
582            Self::Remote(cur) => Ok(cur.set_range(key.to_vec()).await?),
583        }
584    }
585
586    pub async fn prev_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
587    where
588        Key: TableObject,
589        Value: TableObject,
590    {
591        match self {
592            Self::Local(cur) => Ok(cur.prev_multiple()?),
593            Self::Remote(cur) => Ok(cur.prev_multiple().await?),
594        }
595    }
596
597    pub async fn set_lowerbound<Key, Value>(
598        &mut self,
599        key: &[u8],
600    ) -> Result<Option<(bool, Key, Value)>>
601    where
602        Key: TableObject,
603        Value: TableObject,
604    {
605        match self {
606            Self::Local(cur) => Ok(cur.set_lowerbound(key)?),
607            Self::Remote(cur) => Ok(cur.set_lowerbound(key.to_vec()).await?),
608        }
609    }
610
611    fn iter_to_stream<'cur, Key, Value>(
612        itr: crate::cursor::Iter<'cur, K, Key, Value>,
613    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>
614    where
615        Key: TableObject + Send + 'cur,
616        Value: TableObject + Send + 'cur,
617    {
618        Box::pin(try_stream! {
619            for it in itr {
620                let (k, v) = it?;
621                yield (k, v);
622            }
623        })
624    }
625
626    fn intoiter_to_stream<'cur, Key, Value>(
627        itr: crate::cursor::IntoIter<'cur, K, Key, Value>,
628    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>
629    where
630        Key: TableObject + Send + 'cur,
631        Value: TableObject + Send + 'cur,
632    {
633        Box::pin(try_stream! {
634            for it in itr {
635                let (k, v) = it?;
636                yield (k, v);
637            }
638        })
639    }
640
641    fn iterdup_to_steam<'cur, Key, Value>(
642        iterdup: crate::cursor::IterDup<'cur, K, Key, Value>,
643    ) -> Pin<
644        Box<
645            dyn Stream<
646                    Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'cur>>>,
647                > + Send
648                + 'cur,
649        >,
650    >
651    where
652        Key: TableObject + Send + 'cur,
653        Value: TableObject + Send + 'cur,
654    {
655        Box::pin(try_stream! {
656            for it in iterdup {
657                let st = Self::intoiter_to_stream(it);
658                yield st;
659            }
660        })
661    }
662
663    pub fn iter<'a, Key, Value>(
664        &'a mut self,
665    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
666    where
667        Key: TableObject + Send + 'a,
668        Value: TableObject + Send + 'a,
669    {
670        match self {
671            Self::Local(cur) => Self::iter_to_stream(cur.iter::<Key, Value>()),
672            Self::Remote(cur) => cur.iter(),
673        }
674    }
675
676    pub fn into_iter_buffered<'a, Key, Value>(
677        self,
678        buffer_config: BufferConfiguration,
679    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
680    where
681        Key: TableObject + Send + 'a,
682        Value: TableObject + Send + 'a,
683    {
684        match self {
685            Self::Local(mut cur) => Box::pin(try_stream! {
686                for it in cur.iter::<Key, Value>() {
687                    let (k, v) = it?;
688                    yield (k, v);
689                }
690            }),
691            Self::Remote(cur) => cur.into_iter_buffered(buffer_config),
692        }
693    }
694
695    pub fn iter_start<'a, Key, Value>(
696        &'a mut self,
697    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
698    where
699        Key: TableObject + Send + 'a,
700        Value: TableObject + Send + 'a,
701    {
702        match self {
703            Self::Local(cur) => Self::iter_to_stream(cur.iter_start::<Key, Value>()),
704            Self::Remote(cur) => cur.iter_start(),
705        }
706    }
707
708    pub fn into_iter_start_buffered<'a, Key, Value>(
709        self,
710        buffer_config: BufferConfiguration,
711    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
712    where
713        Key: TableObject + Send + 'a,
714        Value: TableObject + Send + 'a,
715    {
716        match self {
717            Self::Local(mut cur) => Box::pin(try_stream! {
718                for it in cur.iter_start::<Key, Value>() {
719                    let (k, v) = it?;
720                    yield (k, v);
721                }
722            }),
723            Self::Remote(cur) => cur.into_iter_start_buffered(buffer_config),
724        }
725    }
726
727    pub async fn iter_from<'a, Key, Value>(
728        &'a mut self,
729        key: &[u8],
730    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
731    where
732        Key: TableObject + Send + 'a,
733        Value: TableObject + Send + 'a,
734    {
735        Ok(match self {
736            Self::Local(cur) => Self::iter_to_stream(cur.iter_from::<Key, Value>(&key)),
737            Self::Remote(cur) => cur.iter_from(key.to_vec()).await?,
738        })
739    }
740
741    pub async fn into_iter_from_buffered<'a, Key, Value>(
742        self,
743        key: &'a [u8],
744        buffer_config: BufferConfiguration,
745    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
746    where
747        Key: TableObject + Send + 'a,
748        Value: TableObject + Send + 'a,
749    {
750        Ok(match self {
751            Self::Local(mut cur) => Box::pin(try_stream! {
752                for it in cur.iter_from::<Key, Value>(&key) {
753                    let (k, v) = it?;
754                    yield (k, v);
755                }
756            }),
757            Self::Remote(cur) => {
758                cur.into_iter_from_buffered(key.to_vec(), buffer_config)
759                    .await?
760            }
761        })
762    }
763
764    pub fn iter_dup<'a, Key, Value>(
765        &'a mut self,
766    ) -> Pin<
767        Box<
768            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
769                + Send
770                + 'a,
771        >,
772    >
773    where
774        Key: TableObject + Send + 'a,
775        Value: TableObject + Send + 'a,
776    {
777        match self {
778            Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup()),
779            Self::Remote(cur) => cur.iter_dup(),
780        }
781    }
782
783    pub fn into_iter_dup_buffered<'a, Key, Value>(
784        self,
785        buffer_config: BufferConfiguration,
786    ) -> Pin<
787        Box<
788            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
789                + Send
790                + 'a,
791        >,
792    >
793    where
794        Key: TableObject + Send + 'a,
795        Value: TableObject + Send + 'a,
796    {
797        match self {
798            Self::Local(cur) => Box::pin(try_stream! {
799                for it in cur.into_iter_dup() {
800                    let st = Self::intoiter_to_stream(it);
801                    yield st;
802                }
803            }),
804            Self::Remote(cur) => cur.into_iter_dup_buffered(buffer_config),
805        }
806    }
807
808    pub fn iter_dup_start<'a, Key, Value>(
809        &'a mut self,
810    ) -> Pin<
811        Box<
812            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
813                + Send
814                + 'a,
815        >,
816    >
817    where
818        Key: TableObject + Send + 'a,
819        Value: TableObject + Send + 'a,
820    {
821        match self {
822            Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup_start()),
823            Self::Remote(cur) => cur.iter_dup_start(),
824        }
825    }
826
827    pub fn into_iter_dup_start_buffered<'a, Key, Value>(
828        self,
829        buffer_config: BufferConfiguration,
830    ) -> Pin<
831        Box<
832            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
833                + Send
834                + 'a,
835        >,
836    >
837    where
838        Key: TableObject + Send + 'a,
839        Value: TableObject + Send + 'a,
840    {
841        match self {
842            Self::Local(cur) => Box::pin(try_stream! {
843                for it in cur.into_iter_dup_start() {
844                    let st = Self::intoiter_to_stream(it);
845                    yield st;
846                }
847            }),
848            Self::Remote(cur) => cur.into_iter_dup_start_buffered(buffer_config),
849        }
850    }
851
852    pub async fn iter_dup_from<'a, Key, Value>(
853        &'a mut self,
854        key: &[u8],
855    ) -> Result<
856        Pin<
857            Box<
858                dyn Stream<
859                        Item = Result<
860                            Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
861                        >,
862                    > + Send
863                    + 'a,
864            >,
865        >,
866    >
867    where
868        Key: TableObject + Send + 'a,
869        Value: TableObject + Send + 'a,
870    {
871        Ok(match self {
872            Self::Local(cur) => Self::iterdup_to_steam(cur.iter_dup_from(&key)),
873            Self::Remote(cur) => cur.iter_dup_from(key.to_vec()).await?,
874        })
875    }
876
877    pub async fn into_iter_dup_from_buffered<'a, Key, Value>(
878        self,
879        key: &'a [u8],
880        buffer_config: BufferConfiguration,
881    ) -> Result<
882        Pin<
883            Box<
884                dyn Stream<
885                        Item = Result<
886                            Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
887                        >,
888                    > + Send
889                    + 'a,
890            >,
891        >,
892    >
893    where
894        Key: TableObject + Send + 'a,
895        Value: TableObject + Send + 'a,
896    {
897        Ok(match self {
898            Self::Local(mut cur) => Box::pin(try_stream! {
899                for it in cur.into_iter_dup_from(&key) {
900                    let st = Self::intoiter_to_stream(it);
901                    yield st;
902                }
903            }),
904            Self::Remote(cur) => {
905                cur.into_iter_dup_from_buffered(key.to_vec(), buffer_config)
906                    .await?
907            }
908        })
909    }
910
911    pub async fn iter_dup_of<'a, Key, Value>(
912        &'a mut self,
913        key: &[u8],
914    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
915    where
916        Key: TableObject + Send + 'a,
917        Value: TableObject + Send + 'a,
918    {
919        Ok(match self {
920            Self::Local(cur) => Self::iter_to_stream(cur.iter_dup_of(&key)),
921            Self::Remote(cur) => cur.iter_dup_of(key.to_vec()).await?,
922        })
923    }
924
925    pub async fn into_iter_dup_of_buffered<'a, Key, Value>(
926        self,
927        key: &'a [u8],
928        buffer_config: BufferConfiguration,
929    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
930    where
931        Key: TableObject + Send + 'a,
932        Value: TableObject + Send + 'a,
933    {
934        Ok(match self {
935            Self::Local(mut cur) => Box::pin(try_stream! {
936                for it in cur.into_iter_dup_of(&key) {
937                    let (k, v) = it?;
938                    yield (k, v);
939                }
940            }),
941            Self::Remote(cur) => {
942                cur.into_iter_dup_of_buffered(key.to_vec(), buffer_config)
943                    .await?
944            }
945        })
946    }
947}
948
949impl CursorAny<RW> {
950    pub async fn put(&mut self, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
951        match self {
952            Self::Local(cur) => Ok(cur.put(key, data, flags)?),
953            Self::Remote(cur) => Ok(cur.put(key.to_vec(), data.to_vec(), flags).await?),
954        }
955    }
956
957    pub async fn del(&mut self, flags: WriteFlags) -> Result<()> {
958        match self {
959            Self::Local(cur) => Ok(cur.del(flags)?),
960            Self::Remote(cur) => Ok(cur.del(flags).await?),
961        }
962    }
963}