1use std::{collections::HashMap, path::PathBuf, sync::Arc};
2
3use ffi::{
4 MDBX_FIRST, MDBX_GET_CURRENT, MDBX_LAST, MDBX_NEXT, MDBX_NEXT_DUP, MDBX_NEXT_MULTIPLE,
5 MDBX_NEXT_NODUP, MDBX_PREV, MDBX_PREV_DUP, MDBX_PREV_MULTIPLE, MDBX_PREV_NODUP, MDBX_SET_KEY,
6 MDBX_SET_RANGE,
7};
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10use tokio::sync::RwLock;
11
12use crate::{
13 environment::RemoteEnvironmentConfig, CommitLatency, Cursor, DatabaseFlags, Environment,
14 EnvironmentBuilder, Info, Stat, Transaction, TransactionKind, WriteFlags, RO, RW,
15};
16
17const ALLOWED_GET_FULL_OPS: &[u32] = &[
18 MDBX_NEXT,
19 MDBX_NEXT_DUP,
20 MDBX_NEXT_MULTIPLE,
21 MDBX_NEXT_NODUP,
22 MDBX_PREV,
23 MDBX_PREV_DUP,
24 MDBX_PREV_NODUP,
25 MDBX_PREV_MULTIPLE,
26 MDBX_FIRST,
27 MDBX_GET_CURRENT,
28 MDBX_LAST,
29 MDBX_SET_KEY,
30 MDBX_SET_RANGE,
31];
32
33#[tarpc::service]
34pub trait RemoteMDBX {
35 async fn open_env(path: PathBuf, builder: RemoteEnvironmentConfig) -> Result<u64, ServerError>;
36
37 async fn env_ro_tx(env: u64) -> Result<u64, ServerError>;
38 async fn env_rw_tx(env: u64) -> Result<u64, ServerError>;
39 async fn env_sync(env: u64, force: bool) -> Result<bool, ServerError>;
40 async fn env_close(env: u64) -> Result<(), ServerError>;
41 async fn env_stat(env: u64) -> Result<Stat, ServerError>;
42 async fn env_info(env: u64) -> Result<Info, ServerError>;
43
44 async fn tx_create_db(
45 env: u64,
46 tx: u64,
47 db: Option<String>,
48 flags: u32,
49 ) -> Result<u32, ServerError>; async fn tx_get(
51 env: u64,
52 tx: u64,
53 dbi: u32,
54 key: Vec<u8>,
55 ) -> Result<Option<Vec<u8>>, ServerError>;
56 async fn tx_put(
57 env: u64,
58 tx: u64,
59 dbi: u32,
60 key: Vec<u8>,
61 value: Vec<u8>,
62 flags: u32,
63 ) -> Result<(), ServerError>;
64 async fn tx_del(
65 env: u64,
66 tx: u64,
67 dbi: u32,
68 key: Vec<u8>,
69 value: Option<Vec<u8>>,
70 ) -> Result<bool, ServerError>;
71 async fn tx_ro_cursor(env: u64, tx: u64, dbi: u32) -> Result<u64, ServerError>;
72 async fn tx_rw_cursor(env: u64, tx: u64, dbi: u32) -> Result<u64, ServerError>;
73 async fn tx_commit(env: u64, tx: u64) -> Result<(bool, CommitLatency), ServerError>;
74 async fn tx_abort(env: u64, tx: u64) -> Result<(), ServerError>;
75 async fn tx_nested(env: u64, tx: u64) -> Result<u64, ServerError>;
76 async fn tx_db_stat(env: u64, tx: u64, dbi: u32) -> Result<Stat, ServerError>;
77 async fn clear_db(env: u64, tx: u64, dbi: u32) -> Result<(), ServerError>;
78
79 async fn cur_get(
80 env: u64,
81 tx: u64,
82 cur: u64,
83 key: Option<Vec<u8>>,
84 data: Option<Vec<u8>>,
85 op: u32,
86 ) -> Result<(Option<Vec<u8>>, Vec<u8>, bool), ServerError>;
87 async fn cur_put(
88 env: u64,
89 tx: u64,
90 cur: u64,
91 key: Vec<u8>,
92 value: Vec<u8>,
93 flags: u32,
94 ) -> Result<(), ServerError>;
95 async fn cur_create(env: u64, tx: u64, cur: u64) -> Result<u64, ServerError>;
96 async fn cur_del(env: u64, tx: u64, cur: u64, flags: u32) -> Result<(), ServerError>;
97 async fn cur_close(env: u64, tx: u64, cur: u64) -> Result<(), ServerError>;
98
99 async fn batch_cur_get_full(
101 env: u64,
102 tx: u64,
103 cur: u64,
104 cnt: u64,
105 buffer: u64,
106 op: u32,
107 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ServerError>;
108}
109
110#[derive(Debug)]
111struct LocalTransactionState<K: TransactionKind> {
112 tx: Transaction<K>,
113 cursors: HashMap<u64, Cursor<K>>,
114 next_cur_id: u64,
115}
116
117impl<K: TransactionKind> LocalTransactionState<K> {
118 pub fn next_id(&mut self) -> u64 {
119 while self.cursors.contains_key(&self.next_cur_id) {
120 self.next_cur_id = self.next_cur_id.wrapping_add(1);
121 }
122
123 self.next_cur_id
124 }
125
126 pub fn cur_clone(&mut self, cur: u64) -> Option<u64> {
127 let cur = self.cursors.get_mut(&cur)?;
128 let new_cur = cur.clone();
129 let new_id = self.next_id();
130 self.cursors.insert(new_id, new_cur);
131 Some(new_id)
132 }
133}
134
135#[derive(Debug)]
136struct DatabaseEnvState {
137 rotxs: HashMap<u64, LocalTransactionState<RO>>,
138 rwtxs: HashMap<u64, LocalTransactionState<RW>>,
139 env: Environment,
140 next_tx_id: u64,
141}
142
143impl DatabaseEnvState {
144 fn next_id(&mut self) -> u64 {
145 while self.rotxs.contains_key(&self.next_tx_id) || self.rwtxs.contains_key(&self.next_tx_id)
146 {
147 self.next_tx_id = self.next_tx_id.wrapping_add(1);
148 }
149
150 self.next_tx_id
151 }
152}
153
154#[derive(Debug, Default)]
155pub struct MDBXServerState {
156 envs: HashMap<u64, DatabaseEnvState>,
157 next_env_id: u64,
158}
159
160impl MDBXServerState {
161 fn next_id(&mut self) -> u64 {
162 while self.envs.contains_key(&self.next_env_id) {
163 self.next_env_id = self.next_env_id.wrapping_add(1);
164 }
165 self.next_env_id
166 }
167}
168
169#[derive(Clone, Debug, Error, Serialize, Deserialize)]
170pub enum ServerError {
171 #[error("mdbx error: {0}")]
172 MBDX(crate::error::Error),
173 #[error("no such env")]
174 NOENV,
175 #[error("no such tx")]
176 NOTX,
177 #[error("no such cursor")]
178 NOCURSOR,
179 #[error("incorrect flag")]
180 INCORRECTFLAG,
181 #[error("fail to get absolute path")]
182 NOPATH,
183 #[error("not writable")]
184 NOWRITABLE,
185 #[error("tokio: {0}")]
186 TOKIO(String),
187 #[error("invalid get_full: {0}")]
188 INVALIDGETULL(u32),
189}
190
191impl From<tokio::task::JoinError> for ServerError {
192 fn from(value: tokio::task::JoinError) -> Self {
193 Self::TOKIO(value.to_string())
194 }
195}
196
197impl From<crate::error::Error> for ServerError {
198 fn from(value: crate::error::Error) -> Self {
199 Self::MBDX(value.into())
200 }
201}
202
203#[derive(Debug, Clone)]
204pub struct RemoteMDBXServer {
205 state: Arc<RwLock<MDBXServerState>>,
206}
207
208impl RemoteMDBXServer {
209 pub fn new() -> Self {
210 Self {
211 state: Arc::new(RwLock::new(MDBXServerState::default())),
212 }
213 }
214}
215
216impl RemoteMDBX for RemoteMDBXServer {
217 async fn open_env(
218 self,
219 _context: tarpc::context::Context,
220 path: PathBuf,
221 cfg: RemoteEnvironmentConfig,
222 ) -> Result<u64, ServerError> {
223 let abs_path = std::path::absolute(path).map_err(|_| ServerError::NOPATH)?;
224 let handle = {
225 let builder = EnvironmentBuilder::from(cfg);
226 let env = tokio::task::spawn_blocking(move || builder.open(&abs_path)).await??;
227 let mut lg = self.state.write().await;
228 let handle = lg.next_id();
229
230 lg.envs.insert(
231 handle,
232 DatabaseEnvState {
233 rotxs: HashMap::new(),
234 rwtxs: HashMap::new(),
235 env: env,
236 next_tx_id: 0,
237 },
238 );
239 handle
240 };
241
242 Ok(handle)
243 }
244
245 async fn env_close(
246 self,
247 _context: tarpc::context::Context,
248 env: u64,
249 ) -> Result<(), ServerError> {
250 let mut lg = self.state.write().await;
251 lg.envs.remove(&env);
252 Ok(())
253 }
254
255 async fn env_stat(
256 self,
257 _context: tarpc::context::Context,
258 env: u64,
259 ) -> Result<Stat, ServerError> {
260 let env = self
261 .state
262 .read()
263 .await
264 .envs
265 .get(&env)
266 .ok_or(ServerError::NOENV)?
267 .env
268 .clone();
269 let ret = env.stat()?;
270
271 Ok(ret)
272 }
273
274 async fn env_info(
275 self,
276 _context: tarpc::context::Context,
277 env: u64,
278 ) -> Result<Info, ServerError> {
279 let env = self
280 .state
281 .read()
282 .await
283 .envs
284 .get(&env)
285 .ok_or(ServerError::NOENV)?
286 .env
287 .clone();
288 let ret = env.info()?;
289
290 Ok(ret)
291 }
292
293 async fn env_sync(
294 self,
295 _context: tarpc::context::Context,
296 env: u64,
297 force: bool,
298 ) -> Result<bool, ServerError> {
299 let env = self
300 .state
301 .read()
302 .await
303 .envs
304 .get(&env)
305 .ok_or(ServerError::NOENV)?
306 .env
307 .clone();
308 let ret = tokio::task::spawn_blocking(move || env.sync(force)).await??;
310
311 Ok(ret)
312 }
313
314 async fn env_ro_tx(
315 self,
316 _context: tarpc::context::Context,
317 env: u64,
318 ) -> Result<u64, ServerError> {
319 let env_clone = self
320 .state
321 .read()
322 .await
323 .envs
324 .get(&env)
325 .ok_or(ServerError::NOENV)?
326 .env
327 .clone();
328 let tx = tokio::task::spawn_blocking(move || env_clone.begin_ro_txn()).await??;
330
331 let mut lg = self.state.write().await;
333 let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
334 let tx_id = env.next_id();
335 env.rotxs.insert(
336 tx_id,
337 LocalTransactionState {
338 tx: tx,
339 cursors: HashMap::new(),
340 next_cur_id: 0,
341 },
342 );
343
344 Ok(tx_id)
345 }
346
347 async fn env_rw_tx(
348 self,
349 _context: tarpc::context::Context,
350 env: u64,
351 ) -> Result<u64, ServerError> {
352 let env_clone = self
353 .state
354 .read()
355 .await
356 .envs
357 .get(&env)
358 .ok_or(ServerError::NOENV)?
359 .env
360 .clone();
361 let tx = tokio::task::spawn_blocking(move || env_clone.begin_rw_txn()).await??;
363
364 let mut lg = self.state.write().await;
366 let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
367 let tx_id = env.next_id();
368 env.rwtxs.insert(
369 tx_id,
370 LocalTransactionState {
371 tx: tx,
372 cursors: HashMap::new(),
373 next_cur_id: 0,
374 },
375 );
376 Ok(tx_id)
377 }
378
379 async fn tx_create_db(
380 self,
381 _context: tarpc::context::Context,
382 env: u64,
383 tx: u64,
384 db: Option<String>,
385 flags: u32,
386 ) -> Result<u32, ServerError> {
387 let flags = DatabaseFlags::from_bits(flags).ok_or(ServerError::INCORRECTFLAG)?;
388
389 let lg = self.state.read().await;
390 let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
391
392 let db = if let Some(tx) = env.rwtxs.get(&tx) {
393 let tx = tx.tx.clone();
394 drop(lg);
395 tokio::task::spawn_blocking(move || tx.open_db_with_flags(db.as_deref(), flags))
396 .await??
397 } else if let Some(tx) = env.rotxs.get(&tx) {
398 let tx = tx.tx.clone();
399 drop(lg);
400 if flags.contains(DatabaseFlags::CREATE) {
401 return Err(ServerError::NOWRITABLE);
402 }
403 tokio::task::spawn_blocking(move || tx.open_db(db.as_deref())).await??
406 } else {
407 return Err(ServerError::NOTX);
408 };
409
410 Ok(db.dbi())
411 }
412
413 async fn tx_get(
414 self,
415 _context: tarpc::context::Context,
416 env: u64,
417 tx: u64,
418 dbi: u32,
419 key: Vec<u8>,
420 ) -> Result<Option<Vec<u8>>, ServerError> {
421 let lg = self.state.read().await;
422 let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
423
424 let val = if let Some(tx) = env.rwtxs.get(&tx) {
425 let tx = tx.tx.clone();
426 drop(lg);
427 tx.get::<Vec<u8>>(dbi, &key)?
428 } else if let Some(ro) = env.rotxs.get(&tx) {
429 let tx_clone = ro.tx.clone();
430 drop(lg);
431 tx_clone.get::<Vec<u8>>(dbi, &key)?
432 } else {
433 return Err(ServerError::NOTX);
434 };
435 Ok(val)
436 }
437
438 async fn tx_put(
439 self,
440 _context: tarpc::context::Context,
441 env: u64,
442 tx: u64,
443 dbi: u32,
444 key: Vec<u8>,
445 value: Vec<u8>,
446 flags: u32,
447 ) -> Result<(), ServerError> {
448 let flags = WriteFlags::from_bits(flags).ok_or(ServerError::INCORRECTFLAG)?;
449 let lg = self.state.read().await;
450
451 let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
452
453 if let Some(tx) = env.rwtxs.get(&tx) {
454 let tx = tx.tx.clone();
455 drop(lg);
456 tx.put(dbi, &key, &value, flags)?;
457 } else {
458 return Err(ServerError::NOTX);
459 }
460
461 Ok(())
462 }
463
464 async fn tx_del(
465 self,
466 _context: tarpc::context::Context,
467 env: u64,
468 tx: u64,
469 dbi: u32,
470 key: Vec<u8>,
471 value: Option<Vec<u8>>,
472 ) -> Result<bool, ServerError> {
473 let lg = self.state.read().await;
474
475 let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
476
477 let ret = if let Some(tx) = env.rwtxs.get(&tx) {
478 let tx = tx.tx.clone();
479 drop(lg);
480 tx.del(dbi, &key, value.as_ref().map(|t| t.as_slice()))?
481 } else {
482 return Err(ServerError::NOTX);
483 };
484
485 Ok(ret)
486 }
487
488 async fn tx_commit(
489 self,
490 _context: tarpc::context::Context,
491 env: u64,
492 tx: u64,
493 ) -> Result<(bool, CommitLatency), ServerError> {
494 let tx = {
495 let mut lg = self.state.write().await;
496 let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
497 env.rwtxs.remove(&tx).ok_or(ServerError::NOTX)?
498 };
499
500 Ok(tokio::task::spawn_blocking(move || tx.tx.commit()).await??)
502 }
503
504 async fn tx_abort(
505 self,
506 _context: tarpc::context::Context,
507 env: u64,
508 tx: u64,
509 ) -> Result<(), ServerError> {
510 let (rw, ro) = {
511 let mut lg = self.state.write().await;
512
513 let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
514
515 (env.rwtxs.remove(&tx), env.rotxs.remove(&tx))
516 };
517
518 if let Some(rw) = rw {
519 drop(rw.tx);
520 } else if let Some(ro) = ro {
521 drop(ro.tx);
522 } else {
523 return Err(ServerError::NOTX);
524 }
525
526 Ok(())
527 }
528
529 async fn tx_nested(
530 self,
531 _context: tarpc::context::Context,
532 env: u64,
533 tx: u64,
534 ) -> Result<u64, ServerError> {
535 let mut tx = self
536 .state
537 .read()
538 .await
539 .envs
540 .get(&env)
541 .ok_or(ServerError::NOENV)?
542 .rwtxs
543 .get(&tx)
544 .ok_or(ServerError::NOTX)?
545 .tx
546 .clone();
547
548 let new_tx = tokio::task::spawn_blocking(move || tx.begin_nested_txn()).await??;
550
551 let mut lg = self.state.write().await;
552 let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
553 let id = env.next_id();
554 env.rwtxs.insert(
555 id,
556 LocalTransactionState {
557 tx: new_tx,
558 cursors: HashMap::new(),
559 next_cur_id: 0,
560 },
561 );
562
563 Ok(id)
564 }
565
566 async fn tx_db_stat(
567 self,
568 _context: tarpc::context::Context,
569 env: u64,
570 tx: u64,
571 dbi: u32,
572 ) -> Result<Stat, ServerError> {
573 let lg = self.state.read().await;
574
575 let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
576
577 let stat = if let Some(rw) = env.rwtxs.get(&tx) {
578 let tx = rw.tx.clone();
579 drop(lg);
580 tx.db_stat_with_dbi(dbi)?
581 } else if let Some(ro) = env.rotxs.get(&tx) {
582 let tx = ro.tx.clone();
583 drop(lg);
584 tx.db_stat_with_dbi(dbi)?
585 } else {
586 return Err(ServerError::NOTX);
587 };
588
589 Ok(stat)
590 }
591
592 async fn clear_db(
593 self,
594 _context: tarpc::context::Context,
595 env: u64,
596 tx: u64,
597 dbi: u32,
598 ) -> Result<(), ServerError> {
599 let tx = self
600 .state
601 .read()
602 .await
603 .envs
604 .get(&env)
605 .ok_or(ServerError::NOENV)?
606 .rwtxs
607 .get(&tx)
608 .ok_or(ServerError::NOTX)?
609 .tx
610 .clone();
611
612 tokio::task::spawn_blocking(move || tx.clear_db(dbi)).await??;
614 Ok(())
615 }
616
617 async fn tx_ro_cursor(
618 self,
619 _context: tarpc::context::Context,
620 env: u64,
621 tx: u64,
622 dbi: u32,
623 ) -> Result<u64, ServerError> {
624 let tx_clone = self
625 .state
626 .read()
627 .await
628 .envs
629 .get(&env)
630 .ok_or(ServerError::NOENV)?
631 .rotxs
632 .get(&tx)
633 .ok_or(ServerError::NOTX)?
634 .tx
635 .clone();
636 let cur = tokio::task::spawn_blocking(move || tx_clone.cursor_with_dbi(dbi)).await??;
637 let mut lg = self.state.write().await;
638 let tx_mut = lg
639 .envs
640 .get_mut(&env)
641 .ok_or(ServerError::NOENV)?
642 .rotxs
643 .get_mut(&tx)
644 .ok_or(ServerError::NOTX)?;
645
646 let cur_id = tx_mut.next_id();
647 tx_mut.cursors.insert(cur_id, cur);
648 return Ok(cur_id);
649 }
650
651 async fn tx_rw_cursor(
652 self,
653 _context: tarpc::context::Context,
654 env: u64,
655 tx: u64,
656 dbi: u32,
657 ) -> Result<u64, ServerError> {
658 let tx_clone = self
659 .state
660 .read()
661 .await
662 .envs
663 .get(&env)
664 .ok_or(ServerError::NOENV)?
665 .rwtxs
666 .get(&tx)
667 .ok_or(ServerError::NOTX)?
668 .tx
669 .clone();
670 let cur = tokio::task::spawn_blocking(move || tx_clone.cursor_with_dbi(dbi)).await??;
671 let mut lg = self.state.write().await;
672 let tx_mut = lg
673 .envs
674 .get_mut(&env)
675 .ok_or(ServerError::NOENV)?
676 .rwtxs
677 .get_mut(&tx)
678 .ok_or(ServerError::NOTX)?;
679
680 let cur_id = tx_mut.next_id();
681 tx_mut.cursors.insert(cur_id, cur);
682 return Ok(cur_id);
683 }
684
685 async fn cur_create(
686 self,
687 _context: tarpc::context::Context,
688 env: u64,
689 tx: u64,
690 cur: u64,
691 ) -> Result<u64, ServerError> {
692 let mut lg = self.state.write().await;
695
696 let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
697
698 if let Some(tx) = env.rwtxs.get_mut(&tx) {
699 let new_cur = tx.cur_clone(cur).ok_or(ServerError::NOCURSOR)?;
700 return Ok(new_cur);
701 } else if let Some(tx) = env.rotxs.get_mut(&tx) {
702 let new_cur = tx.cur_clone(cur).ok_or(ServerError::NOCURSOR)?;
703 return Ok(new_cur);
704 } else {
705 return Err(ServerError::NOTX);
706 };
707 }
708
709 async fn cur_get(
710 self,
711 _context: tarpc::context::Context,
712 env: u64,
713 tx: u64,
714 cur: u64,
715 key: Option<Vec<u8>>,
716 data: Option<Vec<u8>>,
717 op: u32,
718 ) -> Result<(Option<Vec<u8>>, Vec<u8>, bool), ServerError> {
719 let op = mdbx_remote_sys::MDBX_cursor_op::try_from(op)
720 .map_err(|_| crate::error::Error::DecodeError)?;
721 let lg = self.state.read().await;
722
723 let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
724
725 let val = if let Some(tx) = env.rwtxs.get(&tx) {
727 tx.cursors
728 .get(&cur)
729 .ok_or(ServerError::NOCURSOR)?
730 .get::<Vec<u8>, Vec<u8>>(
731 key.as_ref().map(|t| t.as_slice()),
732 data.as_ref().map(|t| t.as_slice()),
733 op,
734 )?
735 } else if let Some(tx) = env.rotxs.get(&tx) {
736 tx.cursors
737 .get(&cur)
738 .ok_or(ServerError::NOCURSOR)?
739 .get::<Vec<u8>, Vec<u8>>(
740 key.as_ref().map(|t| t.as_slice()),
741 data.as_ref().map(|t| t.as_slice()),
742 op,
743 )?
744 } else {
745 return Err(ServerError::NOTX);
746 };
747 Ok(val)
749 }
750
751 async fn cur_put(
752 self,
753 _context: tarpc::context::Context,
754 env: u64,
755 tx: u64,
756 cur: u64,
757 key: Vec<u8>,
758 value: Vec<u8>,
759 flags: u32,
760 ) -> Result<(), ServerError> {
761 let op = WriteFlags::from_bits(flags).ok_or(ServerError::INCORRECTFLAG)?;
763 let mut lg = self.state.write().await;
764
765 let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
766
767 if let Some(tx) = env.rwtxs.get_mut(&tx) {
768 tx.cursors
769 .get_mut(&cur)
770 .ok_or(ServerError::NOCURSOR)?
771 .put(&key, &value, op)?;
772 } else {
773 return Err(ServerError::NOTX);
774 }
775
776 Ok(())
778 }
779
780 async fn cur_del(
781 self,
782 _context: tarpc::context::Context,
783 env: u64,
784 tx: u64,
785 cur: u64,
786 flags: u32,
787 ) -> Result<(), ServerError> {
788 let op = WriteFlags::from_bits(flags).ok_or(ServerError::INCORRECTFLAG)?;
790 let mut lg = self.state.write().await;
791
792 let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
793
794 if let Some(tx) = env.rwtxs.get_mut(&tx) {
795 tx.cursors
796 .get_mut(&cur)
797 .ok_or(ServerError::NOCURSOR)?
798 .del(op)?;
799 } else {
800 return Err(ServerError::NOTX);
801 }
802 Ok(())
804 }
805
806 async fn cur_close(
807 self,
808 _context: tarpc::context::Context,
809 env: u64,
810 tx: u64,
811 cur: u64,
812 ) -> Result<(), ServerError> {
813 let (rw, ro) = {
814 let mut lg = self.state.write().await;
815
816 let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
817
818 (
819 env.rwtxs.get_mut(&tx).and_then(|t| t.cursors.remove(&cur)),
820 env.rotxs.get_mut(&tx).and_then(|t| t.cursors.remove(&cur)),
821 )
822 };
823
824 if let Some(rw) = rw {
825 drop(rw);
826 } else if let Some(ro) = ro {
827 drop(ro);
828 } else {
829 return Err(ServerError::NOCURSOR);
830 }
831
832 Ok(())
833 }
834
835 async fn batch_cur_get_full(
836 self,
837 _context: tarpc::context::Context,
838 env: u64,
839 tx: u64,
840 cur: u64,
841 cnt: u64,
842 buffer: u64,
843 op: u32,
844 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ServerError> {
845 let op = mdbx_remote_sys::MDBX_cursor_op::try_from(op)
846 .map_err(|_| crate::error::Error::DecodeError)?;
847
848 if !ALLOWED_GET_FULL_OPS.contains(&op) {
849 return Err(ServerError::INVALIDGETULL(op));
850 }
851 let lg = self.state.read().await;
852
853 let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
854
855 if let Some(tx) = env.rwtxs.get(&tx) {
857 let cur = tx.cursors.get(&cur).ok_or(ServerError::NOCURSOR)?;
858
859 return Self::batch_cur_get_full_impl(cur, cnt, buffer, op);
860 } else if let Some(tx) = env.rotxs.get(&tx) {
861 let cur = tx.cursors.get(&cur).ok_or(ServerError::NOCURSOR)?;
862 return Self::batch_cur_get_full_impl(cur, cnt, buffer, op);
863 } else {
864 return Err(ServerError::NOTX);
865 }
866 }
867}
868
869impl RemoteMDBXServer {
870 fn batch_cur_get_full_impl<K: TransactionKind>(
871 cur: &Cursor<K>,
872 cnt: u64,
873 buffer: u64,
874 op: u32,
875 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ServerError> {
876 let mut out = Vec::new();
877 let mut current_size = 0;
878 for _ in 0..cnt {
879 if current_size >= buffer {
880 break;
881 }
882
883 match cur.get::<Vec<u8>, Vec<u8>>(None, None, op) {
884 Ok((k, v, _)) => {
885 let key = k.ok_or(ServerError::INVALIDGETULL(op))?;
886 current_size += key.len() as u64 + v.len() as u64;
887 out.push((key, v));
888 }
889 Err(crate::Error::NoData | crate::Error::NotFound) => return Ok(out),
890 Err(e) => {
891 return Err(e.into());
892 }
893 }
894 }
895
896 Ok(out)
897 }
898}