1use std::{
2 future::Future,
3 marker::PhantomData,
4 path::PathBuf,
5 pin::Pin,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9 },
10 time::Duration,
11};
12
13use async_stream::try_stream;
14use ffi::{
15 MDBX_FIRST, MDBX_FIRST_DUP, MDBX_GET_BOTH, MDBX_GET_BOTH_RANGE, MDBX_GET_CURRENT,
16 MDBX_GET_MULTIPLE, MDBX_LAST, MDBX_LAST_DUP, MDBX_NEXT, MDBX_NEXT_DUP, MDBX_NEXT_MULTIPLE,
17 MDBX_NEXT_NODUP, MDBX_PREV, MDBX_PREV_DUP, MDBX_PREV_MULTIPLE, MDBX_PREV_NODUP, MDBX_SET,
18 MDBX_SET_KEY, MDBX_SET_LOWERBOUND, MDBX_SET_RANGE,
19};
20use tarpc::{client::NewClient, context::Context};
21use thiserror::Error;
22use tokio::{runtime::Handle, sync::oneshot};
23use tokio_stream::Stream;
24
25use crate::{
26 environment::RemoteEnvironmentConfig,
27 service::{RemoteMDBXClient, ServerError},
28 CommitLatency, DatabaseFlags, EnvironmentBuilder, EnvironmentKind, Info, Mode, Stat,
29 TableObject, TransactionKind, WriteFlags, RO, RW,
30};
31
32macro_rules! mdbx_try_optional {
33 ($expr:expr) => {{
34 match $expr {
35 Err(ClientError::MDBX(crate::error::Error::NotFound | crate::error::Error::NoData)) => {
36 return Ok(None)
37 }
38 Err(e) => return Err(e),
39 Ok(v) => v,
40 }
41 }};
42}
43
44fn escape_to_async<F, O>(fut: F) -> O
45where
46 F: Future<Output = O>,
47{
48 match Handle::try_current() {
49 Ok(handle) => tokio::task::block_in_place(move || handle.block_on(fut)),
50 Err(_) => tokio::runtime::Builder::new_current_thread()
51 .enable_all()
52 .build()
53 .unwrap()
54 .block_on(fut),
55 }
56}
57
58fn context_deadline(duration: Duration) -> Context {
59 let mut ctx = Context::current();
60 ctx.deadline = std::time::SystemTime::now() + duration;
61 ctx
62}
63
64#[derive(Debug, Clone, Copy)]
65pub struct BufferConfiguration {
66 pub max_count: u64,
67 pub max_buffer_bytes: u64,
68}
69
70impl Default for BufferConfiguration {
71 fn default() -> Self {
72 Self {
73 max_count: 8192,
74 max_buffer_bytes: 16 * 1024 * 1024,
75 }
76 }
77}
78
79impl BufferConfiguration {
80 pub fn new(count: u64, bytes: u64) -> Self {
81 Self {
82 max_count: count,
83 max_buffer_bytes: bytes,
84 }
85 }
86
87 pub fn max_count(self, count: u64) -> Self {
88 Self::new(count, self.max_buffer_bytes)
89 }
90
91 pub fn max_buffer_bytes(self, bytes: u64) -> Self {
92 Self::new(self.max_count, bytes)
93 }
94}
95
96#[derive(Debug, Error)]
97pub enum ClientError {
98 #[error("MDBX error: {0}")]
99 MDBX(crate::error::Error),
100 #[error("RPC error: {0}")]
101 RPC(tarpc::client::RpcError),
102 #[error("parse error")]
103 ParseError,
104 #[error("url parse error: {0}")]
105 WrongURL(url::ParseError),
106 #[error("IO error: {0}")]
107 IO(std::io::Error),
108 #[error("Server error: {0}")]
109 Server(ServerError),
110 #[error("Runtime error: {0}")]
111 Runtime(tokio::task::JoinError),
112}
113
114impl From<tokio::task::JoinError> for ClientError {
115 fn from(value: tokio::task::JoinError) -> Self {
116 Self::Runtime(value)
117 }
118}
119
120impl From<std::io::Error> for ClientError {
121 fn from(value: std::io::Error) -> Self {
122 Self::IO(value)
123 }
124}
125
126impl From<tarpc::client::RpcError> for ClientError {
127 fn from(value: tarpc::client::RpcError) -> Self {
128 Self::RPC(value)
129 }
130}
131
132impl From<crate::error::Error> for ClientError {
133 fn from(value: crate::error::Error) -> Self {
134 Self::MDBX(value)
135 }
136}
137
138impl From<ServerError> for ClientError {
139 fn from(value: ServerError) -> Self {
140 match value {
141 ServerError::MBDX(e) => Self::MDBX(e),
142 _ => Self::Server(value),
143 }
144 }
145}
146
147type Result<T> = std::result::Result<T, ClientError>;
148
149#[derive(Debug)]
150pub(crate) struct RemoteEnvironmentInner {
151 handle: u64,
152 cl: RemoteMDBXClient,
153 deadline: Duration,
154 kind: EnvironmentKind,
155 ch: oneshot::Sender<()>,
156}
157
158impl RemoteEnvironmentInner {
159 fn context(&self) -> Context {
160 context_deadline(self.deadline)
161 }
162
163 async fn new<D, E>(
164 path: PathBuf,
165 builder: EnvironmentBuilder,
166 client: RemoteMDBXClient,
167 dispatcher: D,
168 deadline: Duration,
169 ) -> Result<Self>
170 where
171 D: Future<Output = std::result::Result<(), E>> + Send + 'static,
172 E: std::error::Error + Send + Sync + 'static,
173 {
174 let (tx, rx) = oneshot::channel();
176 tokio::spawn(async move {
177 tokio::select! {
178 _ = rx => {
179 tracing::info!("Exiting from dispatcher due to closer");
180 },
181 e = dispatcher => {
182 tracing::warn!("Dispatcher exits with {:?}", e);
183 }
184 }
185 tracing::debug!("Dispatcher dies");
186 });
187 let remote = RemoteEnvironmentConfig::from(builder);
188 let kind = remote.env_kind();
189 let handle = client
190 .open_env(context_deadline(deadline), path, remote)
191 .await??;
192 Ok(Self {
193 handle: handle,
194 cl: client,
195 deadline: deadline,
196 kind: kind,
197 ch: tx,
198 })
199 }
200
201 async fn close(&self) -> Result<()> {
202 self.cl.env_close(self.context(), self.handle).await??;
203 Ok(())
204 }
205}
206
207impl Drop for RemoteEnvironmentInner {
208 fn drop(&mut self) {
209 if let Err(e) = escape_to_async(self.close()) {
210 tracing::warn!("Fail to close env {} due to {}", self.handle, e);
211 }
212 let (mut t, _) = oneshot::channel();
213 std::mem::swap(&mut self.ch, &mut t); if let Err(_) = t.send(()) {
215 tracing::warn!("Fail to close dispatcher");
216 }
217 }
218}
219
220#[derive(Clone, Debug)]
221pub struct RemoteEnvironment {
222 inner: Arc<RemoteEnvironmentInner>,
223}
224
225impl RemoteEnvironment {
226 pub async fn open_with_builder<D, E>(
227 path: PathBuf,
228 builder: EnvironmentBuilder,
229 client: NewClient<RemoteMDBXClient, D>,
230 deadline: Duration,
231 ) -> Result<Self>
232 where
233 D: Future<Output = std::result::Result<(), E>> + Send + 'static,
234 E: std::error::Error + Send + Sync + 'static,
235 {
236 Ok(Self {
237 inner: Arc::new(
238 RemoteEnvironmentInner::new(
239 path,
240 builder,
241 client.client,
242 client.dispatch,
243 deadline,
244 )
245 .await?,
246 ),
247 })
248 }
249
250 pub async fn begin_ro_txn(&self) -> Result<RemoteTransaction<RO>> {
251 let tx = self
252 .inner
253 .cl
254 .env_ro_tx(self.inner.context(), self.inner.handle)
255 .await??;
256 Ok(RemoteTransaction::new(self.inner.clone(), tx))
257 }
258
259 pub async fn begin_rw_txn(&self) -> Result<RemoteTransaction<RW>> {
260 let tx = self
261 .inner
262 .cl
263 .env_rw_tx(self.inner.context(), self.inner.handle)
264 .await??;
265 Ok(RemoteTransaction::new(self.inner.clone(), tx))
266 }
267
268 pub async fn sync(&self, force: bool) -> Result<bool> {
269 Ok(self
270 .inner
271 .cl
272 .env_sync(self.inner.context(), self.inner.handle, force)
273 .await??)
274 }
275
276 pub async fn stat(&self) -> Result<Stat> {
277 Ok(self
278 .inner
279 .cl
280 .env_stat(self.inner.context(), self.inner.handle)
281 .await??)
282 }
283
284 pub async fn info(&self) -> Result<Info> {
285 Ok(self
286 .inner
287 .cl
288 .env_info(self.inner.context(), self.inner.handle)
289 .await??)
290 }
291
292 pub fn env_kind(&self) -> EnvironmentKind {
293 self.inner.kind
294 }
295
296 pub fn is_write_map(&self) -> bool {
297 self.inner.kind.is_write_map()
298 }
299
300 pub async fn is_read_write(&self) -> Result<bool> {
301 Ok(!self.is_read_only().await?)
302 }
303
304 pub async fn is_read_only(&self) -> Result<bool> {
305 Ok(matches!(self.info().await?.mode(), Mode::ReadOnly))
306 }
307}
308
309#[derive(Debug, Clone)]
310struct RemoteDatabaseInner {
311 dbi: u32,
312 _env: Arc<RemoteEnvironmentInner>,
313}
314
315#[derive(Debug, Clone)]
316pub struct RemoteDatabase {
317 inner: Arc<RemoteDatabaseInner>,
318}
319
320impl RemoteDatabase {
321 pub fn dbi(&self) -> u32 {
322 self.inner.dbi
323 }
324}
325
326#[derive(Debug, Clone)]
327struct RemoteTransactionInner<K: TransactionKind> {
328 env: Arc<RemoteEnvironmentInner>,
329 handle: u64,
330 committed: Arc<AtomicBool>,
331 _ph: PhantomData<K>,
332}
333
334impl<K: TransactionKind> RemoteTransactionInner<K> {
335 async fn close(&self) -> Result<()> {
336 let commited = self.committed.load(Ordering::SeqCst);
337 if !commited && !K::IS_READ_ONLY {
338 self.env
339 .cl
340 .tx_abort(self.env.context(), self.env.handle, self.handle)
341 .await??;
342 }
343 Ok(())
344 }
345}
346
347impl<K: TransactionKind> Drop for RemoteTransactionInner<K> {
348 fn drop(&mut self) {
349 if let Err(e) = escape_to_async(self.close()) {
350 tracing::warn!("Fail to close tx {} due to {}", self.handle, e);
351 }
352 }
353}
354
355#[derive(Debug, Clone)]
356pub struct RemoteTransaction<K: TransactionKind> {
357 inner: Arc<RemoteTransactionInner<K>>,
358}
359
360impl<K: TransactionKind> RemoteTransaction<K> {
361 pub(crate) fn new(env: Arc<RemoteEnvironmentInner>, handle: u64) -> Self {
362 Self {
363 inner: Arc::new(RemoteTransactionInner {
364 env: env,
365 handle: handle,
366 committed: Arc::new(AtomicBool::new(false)),
367 _ph: PhantomData::default(),
368 }),
369 }
370 }
371
372 async fn open_db_with_flags(
373 &self,
374 db: Option<String>,
375 flags: DatabaseFlags,
376 ) -> Result<RemoteDatabase> {
377 let dbi = self
378 .inner
379 .env
380 .cl
381 .tx_create_db(
382 self.inner.env.context(),
383 self.inner.env.handle,
384 self.inner.handle,
385 db,
386 flags.bits(),
387 )
388 .await??;
389
390 Ok(RemoteDatabase {
391 inner: Arc::new(RemoteDatabaseInner {
392 dbi: dbi,
393 _env: self.inner.env.clone(),
394 }),
395 })
396 }
397
398 pub async fn db_stat_with_dbi(&self, dbi: u32) -> Result<Stat> {
399 let stat = self
400 .inner
401 .env
402 .cl
403 .tx_db_stat(
404 self.inner.env.context(),
405 self.inner.env.handle,
406 self.inner.handle,
407 dbi,
408 )
409 .await??;
410
411 Ok(stat)
412 }
413
414 pub async fn db_stat(&self, db: &RemoteDatabase) -> Result<Stat> {
415 self.db_stat_with_dbi(db.dbi()).await
416 }
417
418 pub async fn open_db(&self, db: Option<String>) -> Result<RemoteDatabase> {
419 self.open_db_with_flags(db, DatabaseFlags::empty()).await
420 }
421
422 pub async fn get<V: TableObject>(&self, dbi: u32, key: Vec<u8>) -> Result<Option<V>> {
423 let v = self
424 .inner
425 .env
426 .cl
427 .tx_get(
428 self.inner.env.context(),
429 self.inner.env.handle,
430 self.inner.handle,
431 dbi,
432 key,
433 )
434 .await??;
435
436 v.map(|t| V::decode(&t)).transpose().map_err(|e| e.into())
437 }
438
439 pub async fn cursor(&self, dbi: u32) -> Result<RemoteCursor<K>> {
440 let cur = if K::IS_READ_ONLY {
441 self.inner
442 .env
443 .cl
444 .tx_ro_cursor(
445 self.inner.env.context(),
446 self.inner.env.handle,
447 self.inner.handle,
448 dbi,
449 )
450 .await??
451 } else {
452 self.inner
453 .env
454 .cl
455 .tx_rw_cursor(
456 self.inner.env.context(),
457 self.inner.env.handle,
458 self.inner.handle,
459 dbi,
460 )
461 .await??
462 };
463
464 Ok(RemoteCursor {
465 inner: Arc::new(RemoteCursorInner {
466 tx: self.inner.clone(),
467 handle: cur,
468 }),
469 })
470 }
471}
472
473impl RemoteTransaction<RW> {
474 pub async fn begin_nested_txn(&mut self) -> Result<Self> {
475 let handle = self
476 .inner
477 .env
478 .cl
479 .tx_nested(
480 self.inner.env.context(),
481 self.inner.env.handle,
482 self.inner.handle,
483 )
484 .await??;
485 Ok(Self::new(self.inner.env.clone(), handle))
486 }
487
488 pub async fn clear_db(&self, dbi: u32) -> Result<()> {
489 self.inner
490 .env
491 .cl
492 .clear_db(
493 self.inner.env.context(),
494 self.inner.env.handle,
495 self.inner.handle,
496 dbi,
497 )
498 .await??;
499
500 Ok(())
501 }
502
503 pub async fn put(
504 &self,
505 dbi: u32,
506 key: Vec<u8>,
507 data: Vec<u8>,
508 flags: WriteFlags,
509 ) -> Result<()> {
510 self.inner
511 .env
512 .cl
513 .tx_put(
514 self.inner.env.context(),
515 self.inner.env.handle,
516 self.inner.handle,
517 dbi,
518 key,
519 data,
520 flags.bits(),
521 )
522 .await??;
523 Ok(())
524 }
525
526 pub async fn del(&self, dbi: u32, key: Vec<u8>, value: Option<Vec<u8>>) -> Result<bool> {
527 let ret = self
528 .inner
529 .env
530 .cl
531 .tx_del(
532 self.inner.env.context(),
533 self.inner.env.handle,
534 self.inner.handle,
535 dbi,
536 key,
537 value,
538 )
539 .await??;
540 Ok(ret)
541 }
542
543 pub async fn create_db(
544 &self,
545 db: Option<String>,
546 flags: DatabaseFlags,
547 ) -> Result<RemoteDatabase> {
548 self.open_db_with_flags(db, flags | DatabaseFlags::CREATE)
549 .await
550 }
551
552 pub async fn commit(self) -> Result<(bool, CommitLatency)> {
553 tracing::debug!("going to commit tx {}", self.inner.handle);
554 let (ret, lat) = self
555 .inner
556 .env
557 .cl
558 .tx_commit(
559 self.inner.env.context(),
560 self.inner.env.handle,
561 self.inner.handle,
562 )
563 .await??;
564 self.inner.committed.store(true, Ordering::SeqCst);
565 Ok((ret, lat))
566 }
567}
568
569#[derive(Debug, Clone)]
570struct RemoteCursorInner<K: TransactionKind> {
571 tx: Arc<RemoteTransactionInner<K>>,
572 handle: u64,
573}
574
575impl<K: TransactionKind> RemoteCursorInner<K> {
576 async fn close(&self) -> Result<()> {
577 self.tx
578 .env
579 .cl
580 .cur_close(
581 self.tx.env.context(),
582 self.tx.env.handle,
583 self.tx.handle,
584 self.handle,
585 )
586 .await??;
587 Ok(())
588 }
589
590 async fn cur_clone(&self) -> Result<Self> {
591 let new_cur = self
592 .tx
593 .env
594 .cl
595 .cur_create(
596 self.tx.env.context(),
597 self.tx.env.handle,
598 self.tx.handle,
599 self.handle,
600 )
601 .await??;
602 Ok(Self {
603 tx: self.tx.clone(),
604 handle: new_cur,
605 })
606 }
607}
608
609impl<K: TransactionKind> Drop for RemoteCursorInner<K> {
610 fn drop(&mut self) {
611 if let Err(e) = escape_to_async(self.close()) {
612 tracing::warn!(
613 "Fail to close cursor {} of tx {} from env {} due to {}",
614 self.handle,
615 self.tx.handle,
616 self.tx.env.handle,
617 e
618 );
619 }
620 }
621}
622
623#[derive(Debug)]
624pub struct RemoteCursor<K: TransactionKind> {
625 inner: Arc<RemoteCursorInner<K>>,
626}
627
628impl<K: TransactionKind> RemoteCursor<K> {
629 pub async fn cur_clone(&self) -> Result<Self> {
630 Ok(Self {
631 inner: Arc::new(self.inner.cur_clone().await?),
632 })
633 }
634
635 async fn get<Key: TableObject, V: TableObject>(
636 &self,
637 key: Option<Vec<u8>>,
638 data: Option<Vec<u8>>,
639 op: u32,
640 ) -> Result<(Option<Key>, V, bool)> {
641 let tp = self
642 .inner
643 .tx
644 .env
645 .cl
646 .cur_get(
647 self.inner.tx.env.context(),
648 self.inner.tx.env.handle,
649 self.inner.tx.handle,
650 self.inner.handle,
651 key,
652 data,
653 op,
654 )
655 .await??;
656 let key_out = tp.0.map(|t| Key::decode(&t)).transpose()?;
657 let val_out = V::decode(&tp.1)?;
658 Ok((key_out, val_out, tp.2))
659 }
660
661 async fn get_value<V: TableObject>(
662 &mut self,
663 key: Option<Vec<u8>>,
664 value: Option<Vec<u8>>,
665 op: u32,
666 ) -> Result<Option<V>> {
667 let (_, v, _) = mdbx_try_optional!(self.get::<(), V>(key, value, op).await);
668
669 Ok(Some(v))
670 }
671
672 async fn get_full<Key: TableObject, V: TableObject>(
673 &mut self,
674 key: Option<Vec<u8>>,
675 value: Option<Vec<u8>>,
676 op: u32,
677 ) -> Result<Option<(Key, V)>> {
678 let (k, v, _) = mdbx_try_optional!(self.get::<Key, V>(key, value, op).await);
679
680 Ok(Some((k.unwrap(), v)))
681 }
682
683 pub async fn first<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
684 where
685 Key: TableObject,
686 Value: TableObject,
687 {
688 self.get_full(None, None, MDBX_FIRST).await
689 }
690
691 pub async fn first_dup<Value>(&mut self) -> Result<Option<Value>>
692 where
693 Value: TableObject,
694 {
695 self.get_value(None, None, MDBX_FIRST_DUP).await
696 }
697
698 pub async fn get_both<Value>(&mut self, k: Vec<u8>, v: Vec<u8>) -> Result<Option<Value>>
699 where
700 Value: TableObject,
701 {
702 self.get_value(Some(k), Some(v), MDBX_GET_BOTH).await
703 }
704
705 pub async fn get_both_range<Value>(&mut self, k: Vec<u8>, v: Vec<u8>) -> Result<Option<Value>>
706 where
707 Value: TableObject,
708 {
709 self.get_value(Some(k), Some(v), MDBX_GET_BOTH_RANGE).await
710 }
711
712 pub async fn get_current<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
713 where
714 Key: TableObject,
715 Value: TableObject,
716 {
717 self.get_full(None, None, MDBX_GET_CURRENT).await
718 }
719
720 pub async fn get_multiple<Value>(&mut self) -> Result<Option<Value>>
721 where
722 Value: TableObject,
723 {
724 self.get_value(None, None, MDBX_GET_MULTIPLE).await
725 }
726
727 pub async fn last<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
728 where
729 Key: TableObject,
730 Value: TableObject,
731 {
732 self.get_full(None, None, MDBX_LAST).await
733 }
734
735 pub async fn last_dup<Value>(&mut self) -> Result<Option<Value>>
736 where
737 Value: TableObject,
738 {
739 self.get_value(None, None, MDBX_LAST_DUP).await
740 }
741
742 pub async fn next<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
743 where
744 Key: TableObject,
745 Value: TableObject,
746 {
747 self.get_full(None, None, MDBX_NEXT).await
748 }
749
750 pub async fn next_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
751 where
752 Key: TableObject,
753 Value: TableObject,
754 {
755 self.get_full(None, None, MDBX_NEXT_DUP).await
756 }
757
758 pub async fn next_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
759 where
760 Key: TableObject,
761 Value: TableObject,
762 {
763 self.get_full(None, None, MDBX_NEXT_MULTIPLE).await
764 }
765
766 pub async fn next_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
767 where
768 Key: TableObject,
769 Value: TableObject,
770 {
771 self.get_full(None, None, MDBX_NEXT_NODUP).await
772 }
773
774 pub async fn prev<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
775 where
776 Key: TableObject,
777 Value: TableObject,
778 {
779 self.get_full(None, None, MDBX_PREV).await
780 }
781
782 pub async fn prev_dup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
783 where
784 Key: TableObject,
785 Value: TableObject,
786 {
787 self.get_full(None, None, MDBX_PREV_DUP).await
788 }
789
790 pub async fn prev_nodup<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
791 where
792 Key: TableObject,
793 Value: TableObject,
794 {
795 self.get_full(None, None, MDBX_PREV_NODUP).await
796 }
797
798 pub async fn set<Value>(&mut self, key: Vec<u8>) -> Result<Option<Value>>
799 where
800 Value: TableObject,
801 {
802 self.get_value(Some(key), None, MDBX_SET).await
803 }
804 pub async fn set_key<Key, Value>(&mut self, key: Vec<u8>) -> Result<Option<(Key, Value)>>
805 where
806 Key: TableObject,
807 Value: TableObject,
808 {
809 self.get_full(Some(key), None, MDBX_SET_KEY).await
810 }
811
812 pub async fn set_range<Key, Value>(&mut self, key: Vec<u8>) -> Result<Option<(Key, Value)>>
813 where
814 Key: TableObject,
815 Value: TableObject,
816 {
817 self.get_full(Some(key), None, MDBX_SET_RANGE).await
818 }
819
820 pub async fn prev_multiple<Key, Value>(&mut self) -> Result<Option<(Key, Value)>>
821 where
822 Key: TableObject,
823 Value: TableObject,
824 {
825 self.get_full(None, None, MDBX_PREV_MULTIPLE).await
826 }
827
828 pub async fn set_lowerbound<Key, Value>(
829 &mut self,
830 key: Vec<u8>,
831 ) -> Result<Option<(bool, Key, Value)>>
832 where
833 Key: TableObject,
834 Value: TableObject,
835 {
836 let (k, v, found) =
837 mdbx_try_optional!(self.get(Some(key), None, MDBX_SET_LOWERBOUND).await);
838
839 Ok(Some((found, k.unwrap(), v)))
840 }
841
842 fn stream_iter_buffered<'a, Key: TableObject + Send + 'a, Value: TableObject + Send + 'a>(
843 inner: Arc<RemoteCursorInner<K>>,
844 op: u32,
845 next_op: u32,
846 buffer_config: BufferConfiguration,
847 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>> {
848 let BufferConfiguration {
849 max_count,
850 max_buffer_bytes,
851 } = buffer_config;
852 let st = try_stream! {
853 let cur = inner;
854
855 let ret = match cur.tx.env.cl.cur_get(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, None, None, op).await? {
856 Ok(v) => {
857 let k = Key::decode(&v.0.unwrap_or(Vec::new()))?;
858 let v = Value::decode(&v.1)?;
859
860 Ok(Some((k, v)))
861 },
862 Err(ServerError::MBDX(crate::Error::NoData | crate::Error::NotFound)) => Ok(None),
863 Err(e) => {
864 Err(ClientError::from(e))
865 }
866 };
867
868 let ret = ret?;
869 if let Some(ret) = ret {
870 yield ret;
871
872 loop {
873 let vals = cur.tx.env.cl.batch_cur_get_full(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, max_count, max_buffer_bytes, next_op).await??;
874
875 if vals.len() == 0 {
876 break;
877 }
878 for (k, v) in vals.into_iter() {
879 let k = Key::decode(&k)?;
880 let v = Value::decode(&v)?;
881
882 yield (k, v);
883 }
884 }
885 }
886
887 };
888
889 Box::pin(st)
890 }
891
892 fn stream_iter_dup_buffered<
893 'a,
894 Key: TableObject + Send + 'a,
895 Value: TableObject + Send + 'a,
896 >(
897 inner: Arc<RemoteCursorInner<K>>,
898 op: u32,
899 buffer_config: BufferConfiguration,
900 ) -> Pin<
901 Box<
902 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
903 + Send
904 + 'a,
905 >,
906 > {
907 let st = try_stream! {
908 let cur = inner;
909 let mut op = op;
910 loop {
911 let op = std::mem::replace(&mut op, MDBX_NEXT_NODUP);
912
913 let ret = match cur.tx.env.cl.cur_get(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, None, None, op).await? {
914 Ok(_) => {
915 let new_cur = cur.cur_clone().await?;
916 Ok(Self::stream_iter_buffered::<'a, Key, Value>(Arc::new(new_cur), MDBX_GET_CURRENT, MDBX_NEXT_DUP, buffer_config))
917 },
918 Err(ServerError::MBDX(crate::Error::NoData | crate::Error::NotFound)) => break,
919 Err(e) => {
920 Err(ClientError::from(e))
921 }
922 };
923
924 let ret = ret?;
925 yield ret;
926 }
927 };
928
929 Box::pin(st)
930 }
931
932 fn stream_iter<'a, Key: TableObject + Send + 'a, Value: TableObject + Send + 'a>(
933 inner: Arc<RemoteCursorInner<K>>,
934 op: u32,
935 next_op: u32,
936 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>> {
937 let st = try_stream! {
938 let cur = inner;
939 let mut op = op;
940 let next_op = next_op;
941 loop {
942 let op = std::mem::replace(&mut op, next_op);
943
944 let ret = match cur.tx.env.cl.cur_get(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, None, None, op).await? {
945 Ok(v) => {
946 let k = Key::decode(&v.0.unwrap_or(Vec::new()))?;
947 let v = Value::decode(&v.1)?;
948
949 Ok((k, v))
950 },
951 Err(ServerError::MBDX(crate::Error::NoData | crate::Error::NotFound)) => break,
952 Err(e) => {
953 Err(ClientError::from(e))
954 }
955 };
956
957 let ret = ret?;
958 yield ret;
959 }
960 };
961
962 Box::pin(st)
963 }
964
965 fn stream_iter_dup<'a, Key: TableObject + Send + 'a, Value: TableObject + Send + 'a>(
966 inner: Arc<RemoteCursorInner<K>>,
967 op: u32,
968 ) -> Pin<
969 Box<
970 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
971 + Send
972 + 'a,
973 >,
974 > {
975 let st = try_stream! {
976 let cur = inner;
977 let mut op = op;
978 loop {
979 let op = std::mem::replace(&mut op, MDBX_NEXT_NODUP);
980
981 let ret = match cur.tx.env.cl.cur_get(cur.tx.env.context(), cur.tx.env.handle, cur.tx.handle, cur.handle, None, None, op).await? {
982 Ok(_) => {
983 let new_cur = cur.cur_clone().await?;
984 Ok(Self::stream_iter::<'a, Key, Value>(Arc::new(new_cur), MDBX_GET_CURRENT, MDBX_NEXT_DUP))
985 },
986 Err(ServerError::MBDX(crate::Error::NoData | crate::Error::NotFound)) => break,
987 Err(e) => {
988 Err(ClientError::from(e))
989 }
990 };
991
992 let ret = ret?;
993 yield ret;
994 }
995 };
996
997 Box::pin(st)
998 }
999
1000 pub fn iter<'a, Key, Value>(
1001 &'a mut self,
1002 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
1003 where
1004 Key: TableObject + Send + 'a,
1005 Value: TableObject + Send + 'a,
1006 {
1007 Self::stream_iter(self.inner.clone(), MDBX_NEXT, MDBX_NEXT)
1008 }
1009
1010 pub fn into_iter_buffered<'a, Key, Value>(
1011 self,
1012 buffer_config: BufferConfiguration,
1013 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
1014 where
1015 Key: TableObject + Send + 'a,
1016 Value: TableObject + Send + 'a,
1017 {
1018 Self::stream_iter_buffered(self.inner.clone(), MDBX_NEXT, MDBX_NEXT, buffer_config)
1019 }
1020
1021 pub fn iter_start<'a, Key, Value>(
1022 &'a mut self,
1023 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
1024 where
1025 Key: TableObject + Send + 'a,
1026 Value: TableObject + Send + 'a,
1027 {
1028 Self::stream_iter(self.inner.clone(), MDBX_FIRST, MDBX_NEXT)
1029 }
1030
1031 pub fn into_iter_start_buffered<'a, Key, Value>(
1032 self,
1033 buffer_config: BufferConfiguration,
1034 ) -> Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>
1035 where
1036 Key: TableObject + Send + 'a,
1037 Value: TableObject + Send + 'a,
1038 {
1039 Self::stream_iter_buffered(self.inner.clone(), MDBX_FIRST, MDBX_NEXT, buffer_config)
1040 }
1041
1042 pub async fn iter_from<'a, Key, Value>(
1043 &'a mut self,
1044 key: Vec<u8>,
1045 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
1046 where
1047 Key: TableObject + Send + 'a,
1048 Value: TableObject + Send + 'a,
1049 {
1050 let _ = self.set_range::<(), ()>(key).await?;
1051
1052 Ok(Self::stream_iter(
1053 self.inner.clone(),
1054 MDBX_GET_CURRENT,
1055 MDBX_NEXT,
1056 ))
1057 }
1058
1059 pub async fn into_iter_from_buffered<'a, Key, Value>(
1060 mut self,
1061 key: Vec<u8>,
1062 buffer_config: BufferConfiguration,
1063 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
1064 where
1065 Key: TableObject + Send + 'a,
1066 Value: TableObject + Send + 'a,
1067 {
1068 let _ = self.set_range::<(), ()>(key).await?;
1069
1070 Ok(Self::stream_iter_buffered(
1071 self.inner.clone(),
1072 MDBX_GET_CURRENT,
1073 MDBX_NEXT,
1074 buffer_config,
1075 ))
1076 }
1077
1078 pub fn iter_dup<'a, Key, Value>(
1079 &'a mut self,
1080 ) -> Pin<
1081 Box<
1082 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
1083 + Send
1084 + 'a,
1085 >,
1086 >
1087 where
1088 Key: TableObject + Send + 'a,
1089 Value: TableObject + Send + 'a,
1090 {
1091 Self::stream_iter_dup(self.inner.clone(), MDBX_NEXT)
1092 }
1093
1094 pub fn into_iter_dup_buffered<'a, Key, Value>(
1095 self,
1096 buffer_config: BufferConfiguration,
1097 ) -> Pin<
1098 Box<
1099 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
1100 + Send
1101 + 'a,
1102 >,
1103 >
1104 where
1105 Key: TableObject + Send + 'a,
1106 Value: TableObject + Send + 'a,
1107 {
1108 Self::stream_iter_dup_buffered(self.inner.clone(), MDBX_NEXT, buffer_config)
1109 }
1110
1111 pub fn iter_dup_start<'a, Key, Value>(
1112 &'a mut self,
1113 ) -> Pin<
1114 Box<
1115 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
1116 + Send
1117 + 'a,
1118 >,
1119 >
1120 where
1121 Key: TableObject + Send + 'a,
1122 Value: TableObject + Send + 'a,
1123 {
1124 Self::stream_iter_dup(self.inner.clone(), MDBX_FIRST)
1125 }
1126
1127 pub fn into_iter_dup_start_buffered<'a, Key, Value>(
1128 self,
1129 buffer_config: BufferConfiguration,
1130 ) -> Pin<
1131 Box<
1132 dyn Stream<Item = Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>>
1133 + Send
1134 + 'a,
1135 >,
1136 >
1137 where
1138 Key: TableObject + Send + 'a,
1139 Value: TableObject + Send + 'a,
1140 {
1141 Self::stream_iter_dup_buffered(self.inner.clone(), MDBX_FIRST, buffer_config)
1142 }
1143
1144 pub async fn iter_dup_from<'a, Key, Value>(
1145 &'a mut self,
1146 key: Vec<u8>,
1147 ) -> Result<
1148 Pin<
1149 Box<
1150 dyn Stream<
1151 Item = Result<
1152 Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
1153 >,
1154 > + Send
1155 + 'a,
1156 >,
1157 >,
1158 >
1159 where
1160 Key: TableObject + Send + 'a,
1161 Value: TableObject + Send + 'a,
1162 {
1163 let _ = self.set_range::<(), ()>(key).await?;
1164 Ok(Self::stream_iter_dup(self.inner.clone(), MDBX_GET_CURRENT))
1165 }
1166
1167 pub async fn into_iter_dup_from_buffered<'a, Key, Value>(
1168 mut self,
1169 key: Vec<u8>,
1170 buffer_config: BufferConfiguration,
1171 ) -> Result<
1172 Pin<
1173 Box<
1174 dyn Stream<
1175 Item = Result<
1176 Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>,
1177 >,
1178 > + Send
1179 + 'a,
1180 >,
1181 >,
1182 >
1183 where
1184 Key: TableObject + Send + 'a,
1185 Value: TableObject + Send + 'a,
1186 {
1187 let _ = self.set_range::<(), ()>(key).await?;
1188 Ok(Self::stream_iter_dup_buffered(
1189 self.inner.clone(),
1190 MDBX_GET_CURRENT,
1191 buffer_config,
1192 ))
1193 }
1194
1195 pub async fn iter_dup_of<'a, Key, Value>(
1196 &'a mut self,
1197 key: Vec<u8>,
1198 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
1199 where
1200 Key: TableObject + Send + 'a,
1201 Value: TableObject + Send + 'a,
1202 {
1203 let res = self.set::<()>(key).await?;
1204 if let Some(_) = res {
1205 Ok(Self::stream_iter(
1206 self.inner.clone(),
1207 MDBX_GET_CURRENT,
1208 MDBX_NEXT_DUP,
1209 ))
1210 } else {
1211 let _ = self.last::<(), ()>().await?;
1212 Ok(Self::stream_iter(self.inner.clone(), MDBX_NEXT, MDBX_NEXT))
1213 }
1214 }
1215
1216 pub async fn into_iter_dup_of_buffered<'a, Key, Value>(
1217 mut self,
1218 key: Vec<u8>,
1219 buffer_config: BufferConfiguration,
1220 ) -> Result<Pin<Box<dyn Stream<Item = Result<(Key, Value)>> + Send + 'a>>>
1221 where
1222 Key: TableObject + Send + 'a,
1223 Value: TableObject + Send + 'a,
1224 {
1225 let res = self.set::<()>(key).await?;
1226 if let Some(_) = res {
1227 Ok(Self::stream_iter(
1228 self.inner.clone(),
1229 MDBX_GET_CURRENT,
1230 MDBX_NEXT_DUP,
1231 ))
1232 } else {
1233 let _ = self.last::<(), ()>().await?;
1234 Ok(Self::stream_iter_buffered(
1235 self.inner.clone(),
1236 MDBX_NEXT,
1237 MDBX_NEXT,
1238 buffer_config,
1239 ))
1240 }
1241 }
1242}
1243
1244impl RemoteCursor<RW> {
1245 pub async fn put(&mut self, key: Vec<u8>, data: Vec<u8>, flags: WriteFlags) -> Result<()> {
1246 self.inner
1247 .tx
1248 .env
1249 .cl
1250 .cur_put(
1251 self.inner.tx.env.context(),
1252 self.inner.tx.env.handle,
1253 self.inner.tx.handle,
1254 self.inner.handle,
1255 key,
1256 data,
1257 flags.bits(),
1258 )
1259 .await??;
1260 Ok(())
1261 }
1262
1263 pub async fn del(&mut self, flags: WriteFlags) -> Result<()> {
1264 self.inner
1265 .tx
1266 .env
1267 .cl
1268 .cur_del(
1269 self.inner.tx.env.context(),
1270 self.inner.tx.env.handle,
1271 self.inner.tx.handle,
1272 self.inner.handle,
1273 flags.bits(),
1274 )
1275 .await??;
1276 Ok(())
1277 }
1278}