libmdbx_remote/
remote.rs

1use std::{
2    future::Future,
3    marker::PhantomData,
4    path::PathBuf,
5    pin::Pin,
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc,
9    },
10    time::Duration,
11};
12
13use async_stream::try_stream;
14use ffi::{
15    MDBX_FIRST, MDBX_FIRST_DUP, MDBX_GET_BOTH, MDBX_GET_BOTH_RANGE, MDBX_GET_CURRENT,
16    MDBX_GET_MULTIPLE, MDBX_LAST, MDBX_LAST_DUP, MDBX_NEXT, MDBX_NEXT_DUP, MDBX_NEXT_MULTIPLE,
17    MDBX_NEXT_NODUP, MDBX_PREV, MDBX_PREV_DUP, MDBX_PREV_MULTIPLE, MDBX_PREV_NODUP, MDBX_SET,
18    MDBX_SET_KEY, MDBX_SET_LOWERBOUND, MDBX_SET_RANGE,
19};
20use tarpc::{client::NewClient, context::Context};
21use thiserror::Error;
22use tokio::{runtime::Handle, sync::oneshot};
23use tokio_stream::Stream;
24
25use crate::{
26    environment::RemoteEnvironmentConfig,
27    service::{RemoteMDBXClient, ServerError},
28    CommitLatency, DatabaseFlags, EnvironmentBuilder, EnvironmentKind, Info, Mode, Stat,
29    TableObject, TransactionKind, WriteFlags, RO, RW,
30};
31
32macro_rules! mdbx_try_optional {
33    ($expr:expr) => {{
34        match $expr {
35            Err(ClientError::MDBX(crate::error::Error::NotFound | crate::error::Error::NoData)) => {
36                return Ok(None)
37            }
38            Err(e) => return Err(e),
39            Ok(v) => v,
40        }
41    }};
42}
43
44fn escape_to_async<F, O>(fut: F) -> O
45where
46    F: Future<Output = O>,
47{
48    match Handle::try_current() {
49        Ok(handle) => tokio::task::block_in_place(move || handle.block_on(fut)),
50        Err(_) => tokio::runtime::Builder::new_current_thread()
51            .enable_all()
52            .build()
53            .unwrap()
54            .block_on(fut),
55    }
56}
57
58fn context_deadline(duration: Duration) -> Context {
59    let mut ctx = Context::current();
60    ctx.deadline = std::time::SystemTime::now() + duration;
61    ctx
62}
63
64#[derive(Debug, Clone, Copy)]
65pub struct BufferConfiguration {
66    pub max_count: u64,
67    pub max_buffer_bytes: u64,
68}
69
70impl Default for BufferConfiguration {
71    fn default() -> Self {
72        Self {
73            max_count: 8192,
74            max_buffer_bytes: 16 * 1024 * 1024,
75        }
76    }
77}
78
79impl BufferConfiguration {
80    pub fn new(count: u64, bytes: u64) -> Self {
81        Self {
82            max_count: count,
83            max_buffer_bytes: bytes,
84        }
85    }
86
87    pub fn max_count(self, count: u64) -> Self {
88        Self::new(count, self.max_buffer_bytes)
89    }
90
91    pub fn max_buffer_bytes(self, bytes: u64) -> Self {
92        Self::new(self.max_count, bytes)
93    }
94}
95
96#[derive(Debug, Error)]
97pub enum ClientError {
98    #[error("MDBX error: {0}")]
99    MDBX(crate::error::Error),
100    #[error("RPC error: {0}")]
101    RPC(tarpc::client::RpcError),
102    #[error("parse error")]
103    ParseError,
104    #[error("url parse error: {0}")]
105    WrongURL(url::ParseError),
106    #[error("IO error: {0}")]
107    IO(std::io::Error),
108    #[error("Server error: {0}")]
109    Server(ServerError),
110    #[error("Runtime error: {0}")]
111    Runtime(tokio::task::JoinError),
112}
113
114impl From<tokio::task::JoinError> for ClientError {
115    fn from(value: tokio::task::JoinError) -> Self {
116        Self::Runtime(value)
117    }
118}
119
120impl From<std::io::Error> for ClientError {
121    fn from(value: std::io::Error) -> Self {
122        Self::IO(value)
123    }
124}
125
126impl From<tarpc::client::RpcError> for ClientError {
127    fn from(value: tarpc::client::RpcError) -> Self {
128        Self::RPC(value)
129    }
130}
131
132impl From<crate::error::Error> for ClientError {
133    fn from(value: crate::error::Error) -> Self {
134        Self::MDBX(value)
135    }
136}
137
138impl From<ServerError> for ClientError {
139    fn from(value: ServerError) -> Self {
140        match value {
141            ServerError::MBDX(e) => Self::MDBX(e),
142            _ => Self::Server(value),
143        }
144    }
145}
146
147type Result<T> = std::result::Result<T, ClientError>;
148
149#[derive(Debug)]
150pub(crate) struct RemoteEnvironmentInner {
151    handle: u64,
152    cl: RemoteMDBXClient,
153    deadline: Duration,
154    kind: EnvironmentKind,
155    ch: oneshot::Sender<()>,
156}
157
158impl RemoteEnvironmentInner {
159    fn context(&self) -> Context {
160        context_deadline(self.deadline)
161    }
162
163    async fn new<D, E>(
164        path: PathBuf,
165        builder: EnvironmentBuilder,
166        client: RemoteMDBXClient,
167        dispatcher: D,
168        deadline: Duration,
169    ) -> Result<Self>
170    where
171        D: Future<Output = std::result::Result<(), E>> + Send + 'static,
172        E: std::error::Error + Send + Sync + 'static,
173    {
174        // Spin dispatcher
175        let (tx, rx) = oneshot::channel();
176        tokio::spawn(async move {
177            tokio::select! {
178                _ = rx => {
179                    tracing::info!("Exiting from dispatcher due to closer");
180                },
181                e = dispatcher => {
182                    tracing::warn!("Dispatcher exits with {:?}", e);
183                }
184            }
185            tracing::debug!("Dispatcher dies");
186        });
187        let remote = RemoteEnvironmentConfig::from(builder);
188        let kind = remote.env_kind();
189        let handle = client
190            .open_env(context_deadline(deadline), path, remote)
191            .await??;
192        Ok(Self {
193            handle: handle,
194            cl: client,
195            deadline: deadline,
196            kind: kind,
197            ch: tx,
198        })
199    }
200
201    async fn close(&self) -> Result<()> {
202        self.cl.env_close(self.context(), self.handle).await??;
203        Ok(())
204    }
205}
206
207impl Drop for RemoteEnvironmentInner {
208    fn drop(&mut self) {
209        if let Err(e) = escape_to_async(self.close()) {
210            tracing::warn!("Fail to close env {} due to {}", self.handle, e);
211        }
212        let (mut t, _) = oneshot::channel();
213        std::mem::swap(&mut self.ch, &mut t); // ???
214        if let Err(_) = t.send(()) {
215            tracing::warn!("Fail to close dispatcher");
216        }
217    }
218}
219
220#[derive(Clone, Debug)]
221pub struct RemoteEnvironment {
222    inner: Arc<RemoteEnvironmentInner>,
223}
224
225impl RemoteEnvironment {
226    pub async fn open_with_builder<D, E>(
227        path: PathBuf,
228        builder: EnvironmentBuilder,
229        client: NewClient<RemoteMDBXClient, D>,
230        deadline: Duration,
231    ) -> Result<Self>
232    where
233        D: Future<Output = std::result::Result<(), E>> + Send + 'static,
234        E: std::error::Error + Send + Sync + 'static,
235    {
236        Ok(Self {
237            inner: Arc::new(
238                RemoteEnvironmentInner::new(
239                    path,
240                    builder,
241                    client.client,
242                    client.dispatch,
243                    deadline,
244                )
245                .await?,
246            ),
247        })
248    }
249
250    pub async fn begin_ro_txn(&self) -> Result<RemoteTransaction<RO>> {
251        let tx = self
252            .inner
253            .cl
254            .env_ro_tx(self.inner.context(), self.inner.handle)
255            .await??;
256        Ok(RemoteTransaction::new(self.inner.clone(), tx))
257    }
258
259    pub async fn begin_rw_txn(&self) -> Result<RemoteTransaction<RW>> {
260        let tx = self
261            .inner
262            .cl
263            .env_rw_tx(self.inner.context(), self.inner.handle)
264            .await??;
265        Ok(RemoteTransaction::new(self.inner.clone(), tx))
266    }
267
268    pub async fn sync(&self, force: bool) -> Result<bool> {
269        Ok(self
270            .inner
271            .cl
272            .env_sync(self.inner.context(), self.inner.handle, force)
273            .await??)
274    }
275
276    pub async fn stat(&self) -> Result<Stat> {
277        Ok(self
278            .inner
279            .cl
280            .env_stat(self.inner.context(), self.inner.handle)
281            .await??)
282    }
283
284    pub async fn info(&self) -> Result<Info> {
285        Ok(self
286            .inner
287            .cl
288            .env_info(self.inner.context(), self.inner.handle)
289            .await??)
290    }
291
292    pub fn env_kind(&self) -> EnvironmentKind {
293        self.inner.kind
294    }
295
296    pub fn is_write_map(&self) -> bool {
297        self.inner.kind.is_write_map()
298    }
299
300    pub async fn is_read_write(&self) -> Result<bool> {
301        Ok(!self.is_read_only().await?)
302    }
303
304    pub async fn is_read_only(&self) -> Result<bool> {
305        Ok(matches!(self.info().await?.mode(), Mode::ReadOnly))
306    }
307}
308
309#[derive(Debug, Clone)]
310struct RemoteDatabaseInner {
311    dbi: u32,
312    _env: Arc<RemoteEnvironmentInner>,
313}
314
315#[derive(Debug, Clone)]
316pub struct RemoteDatabase {
317    inner: Arc<RemoteDatabaseInner>,
318}
319
320impl RemoteDatabase {
321    pub fn dbi(&self) -> u32 {
322        self.inner.dbi
323    }
324}
325
326#[derive(Debug, Clone)]
327struct RemoteTransactionInner<K: TransactionKind> {
328    env: Arc<RemoteEnvironmentInner>,
329    handle: u64,
330    committed: Arc<AtomicBool>,
331    _ph: PhantomData<K>,
332}
333
334impl<K: TransactionKind> RemoteTransactionInner<K> {
335    async fn close(&self) -> Result<()> {
336        let commited = self.committed.load(Ordering::SeqCst);
337        if !commited && !K::IS_READ_ONLY {
338            self.env
339                .cl
340                .tx_abort(self.env.context(), self.env.handle, self.handle)
341                .await??;
342        }
343        Ok(())
344    }
345}
346
347impl<K: TransactionKind> Drop for RemoteTransactionInner<K> {
348    fn drop(&mut self) {
349        if let Err(e) = escape_to_async(self.close()) {
350            tracing::warn!("Fail to close tx {} due to {}", self.handle, e);
351        }
352    }
353}
354
355#[derive(Debug, Clone)]
356pub struct RemoteTransaction<K: TransactionKind> {
357    inner: Arc<RemoteTransactionInner<K>>,
358}
359
360impl<K: TransactionKind> RemoteTransaction<K> {
361    pub(crate) fn new(env: Arc<RemoteEnvironmentInner>, handle: u64) -> Self {
362        Self {
363            inner: Arc::new(RemoteTransactionInner {
364                env: env,
365                handle: handle,
366                committed: Arc::new(AtomicBool::new(false)),
367                _ph: PhantomData::default(),
368            }),
369        }
370    }
371
372    async fn open_db_with_flags(
373        &self,
374        db: Option<String>,
375        flags: DatabaseFlags,
376    ) -> Result<RemoteDatabase> {
377        let dbi = self
378            .inner
379            .env
380            .cl
381            .tx_create_db(
382                self.inner.env.context(),
383                self.inner.env.handle,
384                self.inner.handle,
385                db,
386                flags.bits(),
387            )
388            .await??;
389
390        Ok(RemoteDatabase {
391            inner: Arc::new(RemoteDatabaseInner {
392                dbi: dbi,
393                _env: self.inner.env.clone(),
394            }),
395        })
396    }
397
398    pub async fn db_stat_with_dbi(&self, dbi: u32) -> Result<Stat> {
399        let stat = self
400            .inner
401            .env
402            .cl
403            .tx_db_stat(
404                self.inner.env.context(),
405                self.inner.env.handle,
406                self.inner.handle,
407                dbi,
408            )
409            .await??;
410
411        Ok(stat)
412    }
413
414    pub async fn db_stat(&self, db: &RemoteDatabase) -> Result<Stat> {
415        self.db_stat_with_dbi(db.dbi()).await
416    }
417
418    pub async fn open_db(&self, db: Option<String>) -> Result<RemoteDatabase> {
419        self.open_db_with_flags(db, DatabaseFlags::empty()).await
420    }
421
422    pub async fn get<V: TableObject>(&self, dbi: u32, key: Vec<u8>) -> Result<Option<V>> {
423        let v = self
424            .inner
425            .env
426            .cl
427            .tx_get(
428                self.inner.env.context(),
429                self.inner.env.handle,
430                self.inner.handle,
431                dbi,
432                key,
433            )
434            .await??;
435
436        v.map(|t| V::decode(&t)).transpose().map_err(|e| e.into())
437    }
438
439    pub async fn cursor(&self, dbi: u32) -> Result<RemoteCursor<K>> {
440        let cur = if K::IS_READ_ONLY {
441            self.inner
442                .env
443                .cl
444                .tx_ro_cursor(
445                    self.inner.env.context(),
446                    self.inner.env.handle,
447                    self.inner.handle,
448                    dbi,
449                )
450                .await??
451        } else {
452            self.inner
453                .env
454                .cl
455                .tx_rw_cursor(
456                    self.inner.env.context(),
457                    self.inner.env.handle,
458                    self.inner.handle,
459                    dbi,
460                )
461                .await??
462        };
463
464        Ok(RemoteCursor {
465            inner: Arc::new(RemoteCursorInner {
466                tx: self.inner.clone(),
467                handle: cur,
468            }),
469        })
470    }
471}
472
473impl RemoteTransaction<RW> {
474    pub async fn begin_nested_txn(&mut self) -> Result<Self> {
475        let handle = self
476            .inner
477            .env
478            .cl
479            .tx_nested(
480                self.inner.env.context(),
481                self.inner.env.handle,
482                self.inner.handle,
483            )
484            .await??;
485        Ok(Self::new(self.inner.env.clone(), handle))
486    }
487
488    pub async fn clear_db(&self, dbi: u32) -> Result<()> {
489        self.inner
490            .env
491            .cl
492            .clear_db(
493                self.inner.env.context(),
494                self.inner.env.handle,
495                self.inner.handle,
496                dbi,
497            )
498            .await??;
499
500        Ok(())
501    }
502
503    pub async fn put(
504        &self,
505        dbi: u32,
506        key: Vec<u8>,
507        data: Vec<u8>,
508        flags: WriteFlags,
509    ) -> Result<()> {
510        self.inner
511            .env
512            .cl
513            .tx_put(
514                self.inner.env.context(),
515                self.inner.env.handle,
516                self.inner.handle,
517                dbi,
518                key,
519                data,
520                flags.bits(),
521            )
522            .await??;
523        Ok(())
524    }
525
526    pub async fn del(&self, dbi: u32, key: Vec<u8>, value: Option<Vec<u8>>) -> Result<bool> {
527        let ret = self
528            .inner
529            .env
530            .cl
531            .tx_del(
532                self.inner.env.context(),
533                self.inner.env.handle,
534                self.inner.handle,
535                dbi,
536                key,
537                value,
538            )
539            .await??;
540        Ok(ret)
541    }
542
543    pub async fn create_db(
544        &self,
545        db: Option<String>,
546        flags: DatabaseFlags,
547    ) -> Result<RemoteDatabase> {
548        self.open_db_with_flags(db, flags | DatabaseFlags::CREATE)
549            .await
550    }
551
552    pub async fn commit(self) -> Result<(bool, CommitLatency)> {
553        tracing::debug!("going to commit tx {}", self.inner.handle);
554        let (ret, lat) = self
555            .inner
556            .env
557            .cl
558            .tx_commit(
559                self.inner.env.context(),
560                self.inner.env.handle,
561                self.inner.handle,
562            )
563            .await??;
564        self.inner.committed.store(true, Ordering::SeqCst);
565        Ok((ret, lat))
566    }
567}
568
569#[derive(Debug, Clone)]
570struct RemoteCursorInner<K: TransactionKind> {
571    tx: Arc<RemoteTransactionInner<K>>,
572    handle: u64,
573}
574
575impl<K: TransactionKind> RemoteCursorInner<K> {
576    async fn close(&self) -> Result<()> {
577        self.tx
578            .env
579            .cl
580            .cur_close(
581                self.tx.env.context(),
582                self.tx.env.handle,
583                self.tx.handle,
584                self.handle,
585            )
586            .await??;
587        Ok(())
588    }
589
590    async fn cur_clone(&self) -> Result<Self> {
591        let new_cur = self
592            .tx
593            .env
594            .cl
595            .cur_create(
596                self.tx.env.context(),
597                self.tx.env.handle,
598                self.tx.handle,
599                self.handle,
600            )
601            .await??;
602        Ok(Self {
603            tx: self.tx.clone(),
604            handle: new_cur,
605        })
606    }
607}
608
609impl<K: TransactionKind> Drop for RemoteCursorInner<K> {
610    fn drop(&mut self) {
611        if let Err(e) = escape_to_async(self.close()) {
612            tracing::warn!(
613                "Fail to close cursor {} of tx {} from env {} due to {}",
614                self.handle,
615                self.tx.handle,
616                self.tx.env.handle,
617                e
618            );
619        }
620    }
621}
622
623#[derive(Debug)]
624pub struct RemoteCursor<K: TransactionKind> {
625    inner: Arc<RemoteCursorInner<K>>,
626}
627
628impl<K: TransactionKind> RemoteCursor<K> {
629    pub async fn cur_clone(&self) -> Result<Self> {
630        Ok(Self {
631            inner: Arc::new(self.inner.cur_clone().await?),
632        })
633    }
634
635    async fn get<Key: TableObject, V: TableObject>(
636        &self,
637        key: Option<Vec<u8>>,
638        data: Option<Vec<u8>>,
639        op: u32,
640    ) -> Result<(Option<Key>, V, bool)> {
641        let tp = self
642            .inner
643            .tx
644            .env
645            .cl
646            .cur_get(
647                self.inner.tx.env.context(),
648                self.inner.tx.env.handle,
649                self.inner.tx.handle,
650                self.inner.handle,
651                key,
652                data,
653                op,
654            )
655            .await??;
656        let key_out = tp.0.map(|t| Key::decode(&t)).transpose()?;
657        let val_out = V::decode(&tp.1)?;
658        Ok((key_out, val_out, tp.2))
659    }
660
661    async fn get_value<V: TableObject>(
662        &mut self,
663        key: Option<Vec<u8>>,
664        value: Option<Vec<u8>>,
665        op: u32,
666    ) -> Result<Option<V>> {
667        let (_, v, _) = mdbx_try_optional!(self.get::<(), V>(key, value, op).await);
668
669        Ok(Some(v))
670    }
671
672    async fn get_full<Key: TableObject, V: TableObject>(
673        &mut self,
674        key: Option<Vec<u8>>,
675        value: Option<Vec<u8>>,
676        op: u32,
677    ) -> Result<Option<(Key, V)>> {
678        let (k, v, _) = mdbx_try_optional!(self.get::<Key, V>(key, value, op).await);
679
680        Ok(Some((k.unwrap(), v)))
681    }
682
683    pub async fn first<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
684    where
685        Key: TableObject,
686        Value: TableObject,
687    {
688        self.get_full(None, None, MDBX_FIRST).await
689    }
690
691    pub async fn first_dup<Value>(&mut self) -> Result<Option<Value>>
692    where
693        Value: TableObject,
694    {
695        self.get_value(None, None, MDBX_FIRST_DUP).await
696    }
697
698    pub async fn get_both<Value>(&mut self, k: Vec<u8>, v: Vec<u8>) -> Result<Option<Value>>
699    where
700        Value: TableObject,
701    {
702        self.get_value(Some(k), Some(v), MDBX_GET_BOTH).await
703    }
704
705    pub async fn get_both_range<Value>(&mut self, k: Vec<u8>, v: Vec<u8>) -> Result<Option<Value>>
706    where
707        Value: TableObject,
708    {
709        self.get_value(Some(k), Some(v), MDBX_GET_BOTH_RANGE).await
710    }
711
712    pub async fn get_current<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
713    where
714        Key: TableObject,
715        Value: TableObject,
716    {
717        self.get_full(None, None, MDBX_GET_CURRENT).await
718    }
719
720    pub async fn get_multiple<Value>(&mut self) -> Result<Option<Value>>
721    where
722        Value: TableObject,
723    {
724        self.get_value(None, None, MDBX_GET_MULTIPLE).await
725    }
726
727    pub async fn last<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
728    where
729        Key: TableObject,
730        Value: TableObject,
731    {
732        self.get_full(None, None, MDBX_LAST).await
733    }
734
735    pub async fn last_dup<Value>(&mut self) -> Result<Option<Value>>
736    where
737        Value: TableObject,
738    {
739        self.get_value(None, None, MDBX_LAST_DUP).await
740    }
741
742    pub async fn next<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
743    where
744        Key: TableObject,
745        Value: TableObject,
746    {
747        self.get_full(None, None, MDBX_NEXT).await
748    }
749
750    pub async fn next_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
751    where
752        Key: TableObject,
753        Value: TableObject,
754    {
755        self.get_full(None, None, MDBX_NEXT_DUP).await
756    }
757
758    pub async fn next_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
759    where
760        Key: TableObject,
761        Value: TableObject,
762    {
763        self.get_full(None, None, MDBX_NEXT_MULTIPLE).await
764    }
765
766    pub async fn next_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
767    where
768        Key: TableObject,
769        Value: TableObject,
770    {
771        self.get_full(None, None, MDBX_NEXT_NODUP).await
772    }
773
774    pub async fn prev<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
775    where
776        Key: TableObject,
777        Value: TableObject,
778    {
779        self.get_full(None, None, MDBX_PREV).await
780    }
781
782    pub async fn prev_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
783    where
784        Key: TableObject,
785        Value: TableObject,
786    {
787        self.get_full(None, None, MDBX_PREV_DUP).await
788    }
789
790    pub async fn prev_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
791    where
792        Key: TableObject,
793        Value: TableObject,
794    {
795        self.get_full(None, None, MDBX_PREV_NODUP).await
796    }
797
798    pub async fn set<Value>(&mut self, key: Vec<u8>) -> Result<Option<Value>>
799    where
800        Value: TableObject,
801    {
802        self.get_value(Some(key), None, MDBX_SET).await
803    }
804    pub async fn set_key<Key, Value>(&mut self, key: Vec<u8>) -> Result<Option<(Key, Value)>>
805    where
806        Key: TableObject,
807        Value: TableObject,
808    {
809        self.get_full(Some(key), None, MDBX_SET_KEY).await
810    }
811
812    pub async fn set_range<Key, Value>(&mut self, key: Vec<u8>) -> Result<Option<(Key, Value)>>
813    where
814        Key: TableObject,
815        Value: TableObject,
816    {
817        self.get_full(Some(key), None, MDBX_SET_RANGE).await
818    }
819
820    pub async fn prev_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
821    where
822        Key: TableObject,
823        Value: TableObject,
824    {
825        self.get_full(None, None, MDBX_PREV_MULTIPLE).await
826    }
827
828    pub async fn set_lowerbound<Key, Value>(
829        &mut self,
830        key: Vec<u8>,
831    ) -> Result<Option<(bool, Key, Value)>>
832    where
833        Key: TableObject,
834        Value: TableObject,
835    {
836        let (k, v, found) =
837            mdbx_try_optional!(self.get(Some(key), None, MDBX_SET_LOWERBOUND).await);
838
839        Ok(Some((found, k.unwrap(), v)))
840    }
841
842    fn stream_iter_buffered<'a, Key: TableObject + Send + 'a, Value: TableObject + Send + 'a>(
843        inner: Arc<RemoteCursorInner<K>>,
844        op: u32,
845        next_op: u32,
846        buffer_config: BufferConfiguration,
847    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>> {
848        let BufferConfiguration {
849            max_count,
850            max_buffer_bytes,
851        } = buffer_config;
852        let st = try_stream! {
853            let cur = inner;
854
855            let ret = match cur.tx.env.cl.cur_get(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, None, None, op).await? {
856                Ok(v) => {
857                    let k = Key::decode(&v.0.unwrap_or(Vec::new()))?;
858                    let v = Value::decode(&v.1)?;
859
860                    Ok(Some((k, v)))
861                },
862                Err(ServerError::MBDX(crate::Error::NoData | crate::Error::NotFound)) => Ok(None),
863                Err(e) => {
864                    Err(ClientError::from(e))
865                }
866            };
867
868            let ret = ret?;
869            if let Some(ret) = ret {
870                yield ret;
871
872                loop {
873                    let vals = cur.tx.env.cl.batch_cur_get_full(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, max_count, max_buffer_bytes, next_op).await??;
874
875                    if vals.len() == 0 {
876                        break;
877                    }
878                    for (k, v) in vals.into_iter() {
879                        let k = Key::decode(&k)?;
880                        let v = Value::decode(&v)?;
881
882                        yield (k, v);
883                    }
884                }
885            }
886
887        };
888
889        Box::pin(st)
890    }
891
892    fn stream_iter_dup_buffered<
893        'a,
894        Key: TableObject + Send + 'a,
895        Value: TableObject + Send + 'a,
896    >(
897        inner: Arc<RemoteCursorInner<K>>,
898        op: u32,
899        buffer_config: BufferConfiguration,
900    ) -> Pin<
901        Box<
902            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
903                + Send
904                + 'a,
905        >,
906    > {
907        let st = try_stream! {
908            let cur = inner;
909            let mut op = op;
910            loop {
911                let op = std::mem::replace(&mut op, MDBX_NEXT_NODUP);
912
913                let ret = match cur.tx.env.cl.cur_get(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, None, None, op).await? {
914                    Ok(_) => {
915                        let new_cur = cur.cur_clone().await?;
916                        Ok(Self::stream_iter_buffered::<'a, Key, Value>(Arc::new(new_cur), MDBX_GET_CURRENT, MDBX_NEXT_DUP, buffer_config))
917                    },
918                    Err(ServerError::MBDX(crate::Error::NoData | crate::Error::NotFound)) => break,
919                    Err(e) => {
920                        Err(ClientError::from(e))
921                    }
922                };
923
924                let ret = ret?;
925                yield ret;
926            }
927        };
928
929        Box::pin(st)
930    }
931
932    fn stream_iter<'a, Key: TableObject + Send + 'a, Value: TableObject + Send + 'a>(
933        inner: Arc<RemoteCursorInner<K>>,
934        op: u32,
935        next_op: u32,
936    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>> {
937        let st = try_stream! {
938            let cur = inner;
939            let mut op = op;
940            let next_op = next_op;
941            loop {
942                let op = std::mem::replace(&mut op, next_op);
943
944                let ret = match cur.tx.env.cl.cur_get(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, None, None, op).await? {
945                    Ok(v) => {
946                        let k = Key::decode(&v.0.unwrap_or(Vec::new()))?;
947                        let v = Value::decode(&v.1)?;
948
949                        Ok((k, v))
950                    },
951                    Err(ServerError::MBDX(crate::Error::NoData | crate::Error::NotFound)) => break,
952                    Err(e) => {
953                        Err(ClientError::from(e))
954                    }
955                };
956
957                let ret = ret?;
958                yield ret;
959            }
960        };
961
962        Box::pin(st)
963    }
964
965    fn stream_iter_dup<'a, Key: TableObject + Send + 'a, Value: TableObject + Send + 'a>(
966        inner: Arc<RemoteCursorInner<K>>,
967        op: u32,
968    ) -> Pin<
969        Box<
970            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
971                + Send
972                + 'a,
973        >,
974    > {
975        let st = try_stream! {
976            let cur = inner;
977            let mut op = op;
978            loop {
979                let op = std::mem::replace(&mut op, MDBX_NEXT_NODUP);
980
981                let ret = match cur.tx.env.cl.cur_get(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, None, None, op).await? {
982                    Ok(_) => {
983                        let new_cur = cur.cur_clone().await?;
984                        Ok(Self::stream_iter::<'a, Key, Value>(Arc::new(new_cur), MDBX_GET_CURRENT, MDBX_NEXT_DUP))
985                    },
986                    Err(ServerError::MBDX(crate::Error::NoData | crate::Error::NotFound)) => break,
987                    Err(e) => {
988                        Err(ClientError::from(e))
989                    }
990                };
991
992                let ret = ret?;
993                yield ret;
994            }
995        };
996
997        Box::pin(st)
998    }
999
1000    pub fn iter<'a, Key, Value>(
1001        &'a mut self,
1002    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
1003    where
1004        Key: TableObject + Send + 'a,
1005        Value: TableObject + Send + 'a,
1006    {
1007        Self::stream_iter(self.inner.clone(), MDBX_NEXT, MDBX_NEXT)
1008    }
1009
1010    pub fn into_iter_buffered<'a, Key, Value>(
1011        self,
1012        buffer_config: BufferConfiguration,
1013    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
1014    where
1015        Key: TableObject + Send + 'a,
1016        Value: TableObject + Send + 'a,
1017    {
1018        Self::stream_iter_buffered(self.inner.clone(), MDBX_NEXT, MDBX_NEXT, buffer_config)
1019    }
1020
1021    pub fn iter_start<'a, Key, Value>(
1022        &'a mut self,
1023    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
1024    where
1025        Key: TableObject + Send + 'a,
1026        Value: TableObject + Send + 'a,
1027    {
1028        Self::stream_iter(self.inner.clone(), MDBX_FIRST, MDBX_NEXT)
1029    }
1030
1031    pub fn into_iter_start_buffered<'a, Key, Value>(
1032        self,
1033        buffer_config: BufferConfiguration,
1034    ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
1035    where
1036        Key: TableObject + Send + 'a,
1037        Value: TableObject + Send + 'a,
1038    {
1039        Self::stream_iter_buffered(self.inner.clone(), MDBX_FIRST, MDBX_NEXT, buffer_config)
1040    }
1041
1042    pub async fn iter_from<'a, Key, Value>(
1043        &'a mut self,
1044        key: Vec<u8>,
1045    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
1046    where
1047        Key: TableObject + Send + 'a,
1048        Value: TableObject + Send + 'a,
1049    {
1050        let _ = self.set_range::<(), ()>(key).await?;
1051
1052        Ok(Self::stream_iter(
1053            self.inner.clone(),
1054            MDBX_GET_CURRENT,
1055            MDBX_NEXT,
1056        ))
1057    }
1058
1059    pub async fn into_iter_from_buffered<'a, Key, Value>(
1060        mut self,
1061        key: Vec<u8>,
1062        buffer_config: BufferConfiguration,
1063    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
1064    where
1065        Key: TableObject + Send + 'a,
1066        Value: TableObject + Send + 'a,
1067    {
1068        let _ = self.set_range::<(), ()>(key).await?;
1069
1070        Ok(Self::stream_iter_buffered(
1071            self.inner.clone(),
1072            MDBX_GET_CURRENT,
1073            MDBX_NEXT,
1074            buffer_config,
1075        ))
1076    }
1077
1078    pub fn iter_dup<'a, Key, Value>(
1079        &'a mut self,
1080    ) -> Pin<
1081        Box<
1082            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
1083                + Send
1084                + 'a,
1085        >,
1086    >
1087    where
1088        Key: TableObject + Send + 'a,
1089        Value: TableObject + Send + 'a,
1090    {
1091        Self::stream_iter_dup(self.inner.clone(), MDBX_NEXT)
1092    }
1093
1094    pub fn into_iter_dup_buffered<'a, Key, Value>(
1095        self,
1096        buffer_config: BufferConfiguration,
1097    ) -> Pin<
1098        Box<
1099            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
1100                + Send
1101                + 'a,
1102        >,
1103    >
1104    where
1105        Key: TableObject + Send + 'a,
1106        Value: TableObject + Send + 'a,
1107    {
1108        Self::stream_iter_dup_buffered(self.inner.clone(), MDBX_NEXT, buffer_config)
1109    }
1110
1111    pub fn iter_dup_start<'a, Key, Value>(
1112        &'a mut self,
1113    ) -> Pin<
1114        Box<
1115            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
1116                + Send
1117                + 'a,
1118        >,
1119    >
1120    where
1121        Key: TableObject + Send + 'a,
1122        Value: TableObject + Send + 'a,
1123    {
1124        Self::stream_iter_dup(self.inner.clone(), MDBX_FIRST)
1125    }
1126
1127    pub fn into_iter_dup_start_buffered<'a, Key, Value>(
1128        self,
1129        buffer_config: BufferConfiguration,
1130    ) -> Pin<
1131        Box<
1132            dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
1133                + Send
1134                + 'a,
1135        >,
1136    >
1137    where
1138        Key: TableObject + Send + 'a,
1139        Value: TableObject + Send + 'a,
1140    {
1141        Self::stream_iter_dup_buffered(self.inner.clone(), MDBX_FIRST, buffer_config)
1142    }
1143
1144    pub async fn iter_dup_from<'a, Key, Value>(
1145        &'a mut self,
1146        key: Vec<u8>,
1147    ) -> Result<
1148        Pin<
1149            Box<
1150                dyn Stream<
1151                        Item = Result<
1152                            Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
1153                        >,
1154                    > + Send
1155                    + 'a,
1156            >,
1157        >,
1158    >
1159    where
1160        Key: TableObject + Send + 'a,
1161        Value: TableObject + Send + 'a,
1162    {
1163        let _ = self.set_range::<(), ()>(key).await?;
1164        Ok(Self::stream_iter_dup(self.inner.clone(), MDBX_GET_CURRENT))
1165    }
1166
1167    pub async fn into_iter_dup_from_buffered<'a, Key, Value>(
1168        mut self,
1169        key: Vec<u8>,
1170        buffer_config: BufferConfiguration,
1171    ) -> Result<
1172        Pin<
1173            Box<
1174                dyn Stream<
1175                        Item = Result<
1176                            Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
1177                        >,
1178                    > + Send
1179                    + 'a,
1180            >,
1181        >,
1182    >
1183    where
1184        Key: TableObject + Send + 'a,
1185        Value: TableObject + Send + 'a,
1186    {
1187        let _ = self.set_range::<(), ()>(key).await?;
1188        Ok(Self::stream_iter_dup_buffered(
1189            self.inner.clone(),
1190            MDBX_GET_CURRENT,
1191            buffer_config,
1192        ))
1193    }
1194
1195    pub async fn iter_dup_of<'a, Key, Value>(
1196        &'a mut self,
1197        key: Vec<u8>,
1198    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
1199    where
1200        Key: TableObject + Send + 'a,
1201        Value: TableObject + Send + 'a,
1202    {
1203        let res = self.set::<()>(key).await?;
1204        if let Some(_) = res {
1205            Ok(Self::stream_iter(
1206                self.inner.clone(),
1207                MDBX_GET_CURRENT,
1208                MDBX_NEXT_DUP,
1209            ))
1210        } else {
1211            let _ = self.last::<(), ()>().await?;
1212            Ok(Self::stream_iter(self.inner.clone(), MDBX_NEXT, MDBX_NEXT))
1213        }
1214    }
1215
1216    pub async fn into_iter_dup_of_buffered<'a, Key, Value>(
1217        mut self,
1218        key: Vec<u8>,
1219        buffer_config: BufferConfiguration,
1220    ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
1221    where
1222        Key: TableObject + Send + 'a,
1223        Value: TableObject + Send + 'a,
1224    {
1225        let res = self.set::<()>(key).await?;
1226        if let Some(_) = res {
1227            Ok(Self::stream_iter(
1228                self.inner.clone(),
1229                MDBX_GET_CURRENT,
1230                MDBX_NEXT_DUP,
1231            ))
1232        } else {
1233            let _ = self.last::<(), ()>().await?;
1234            Ok(Self::stream_iter_buffered(
1235                self.inner.clone(),
1236                MDBX_NEXT,
1237                MDBX_NEXT,
1238                buffer_config,
1239            ))
1240        }
1241    }
1242}
1243
1244impl RemoteCursor<RW> {
1245    pub async fn put(&mut self, key: Vec<u8>, data: Vec<u8>, flags: WriteFlags) -> Result<()> {
1246        self.inner
1247            .tx
1248            .env
1249            .cl
1250            .cur_put(
1251                self.inner.tx.env.context(),
1252                self.inner.tx.env.handle,
1253                self.inner.tx.handle,
1254                self.inner.handle,
1255                key,
1256                data,
1257                flags.bits(),
1258            )
1259            .await??;
1260        Ok(())
1261    }
1262
1263    pub async fn del(&mut self, flags: WriteFlags) -> Result<()> {
1264        self.inner
1265            .tx
1266            .env
1267            .cl
1268            .cur_del(
1269                self.inner.tx.env.context(),
1270                self.inner.tx.env.handle,
1271                self.inner.tx.handle,
1272                self.inner.handle,
1273                flags.bits(),
1274            )
1275            .await??;
1276        Ok(())
1277    }
1278}