libmdbx_remote/
service.rs

1use std::{collections::HashMap, path::PathBuf, sync::Arc};
2
3use ffi::{
4    MDBX_FIRST, MDBX_GET_CURRENT, MDBX_LAST, MDBX_NEXT, MDBX_NEXT_DUP, MDBX_NEXT_MULTIPLE,
5    MDBX_NEXT_NODUP, MDBX_PREV, MDBX_PREV_DUP, MDBX_PREV_MULTIPLE, MDBX_PREV_NODUP, MDBX_SET_KEY,
6    MDBX_SET_RANGE,
7};
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10use tokio::sync::RwLock;
11
12use crate::{
13    environment::RemoteEnvironmentConfig, CommitLatency, Cursor, DatabaseFlags, Environment,
14    EnvironmentBuilder, Info, Stat, Transaction, TransactionKind, WriteFlags, RO, RW,
15};
16
17const ALLOWED_GET_FULL_OPS: &[u32] = &[
18    MDBX_NEXT,
19    MDBX_NEXT_DUP,
20    MDBX_NEXT_MULTIPLE,
21    MDBX_NEXT_NODUP,
22    MDBX_PREV,
23    MDBX_PREV_DUP,
24    MDBX_PREV_NODUP,
25    MDBX_PREV_MULTIPLE,
26    MDBX_FIRST,
27    MDBX_GET_CURRENT,
28    MDBX_LAST,
29    MDBX_SET_KEY,
30    MDBX_SET_RANGE,
31];
32
33#[tarpc::service]
34pub trait RemoteMDBX {
35    async fn open_env(path: PathBuf, builder: RemoteEnvironmentConfig) -> Result<u64, ServerError>;
36
37    async fn env_ro_tx(env: u64) -> Result<u64, ServerError>;
38    async fn env_rw_tx(env: u64) -> Result<u64, ServerError>;
39    async fn env_sync(env: u64, force: bool) -> Result<bool, ServerError>;
40    async fn env_close(env: u64) -> Result<(), ServerError>;
41    async fn env_stat(env: u64) -> Result<Stat, ServerError>;
42    async fn env_info(env: u64) -> Result<Info, ServerError>;
43
44    async fn tx_create_db(
45        env: u64,
46        tx: u64,
47        db: Option<String>,
48        flags: u32,
49    ) -> Result<u32, ServerError>; // dbi
50    async fn tx_get(
51        env: u64,
52        tx: u64,
53        dbi: u32,
54        key: Vec<u8>,
55    ) -> Result<Option<Vec<u8>>, ServerError>;
56    async fn tx_put(
57        env: u64,
58        tx: u64,
59        dbi: u32,
60        key: Vec<u8>,
61        value: Vec<u8>,
62        flags: u32,
63    ) -> Result<(), ServerError>;
64    async fn tx_del(
65        env: u64,
66        tx: u64,
67        dbi: u32,
68        key: Vec<u8>,
69        value: Option<Vec<u8>>,
70    ) -> Result<bool, ServerError>;
71    async fn tx_ro_cursor(env: u64, tx: u64, dbi: u32) -> Result<u64, ServerError>;
72    async fn tx_rw_cursor(env: u64, tx: u64, dbi: u32) -> Result<u64, ServerError>;
73    async fn tx_commit(env: u64, tx: u64) -> Result<(bool, CommitLatency), ServerError>;
74    async fn tx_abort(env: u64, tx: u64) -> Result<(), ServerError>;
75    async fn tx_nested(env: u64, tx: u64) -> Result<u64, ServerError>;
76    async fn tx_db_stat(env: u64, tx: u64, dbi: u32) -> Result<Stat, ServerError>;
77    async fn clear_db(env: u64, tx: u64, dbi: u32) -> Result<(), ServerError>;
78
79    async fn cur_get(
80        env: u64,
81        tx: u64,
82        cur: u64,
83        key: Option<Vec<u8>>,
84        data: Option<Vec<u8>>,
85        op: u32,
86    ) -> Result<(Option<Vec<u8>>, Vec<u8>, bool), ServerError>;
87    async fn cur_put(
88        env: u64,
89        tx: u64,
90        cur: u64,
91        key: Vec<u8>,
92        value: Vec<u8>,
93        flags: u32,
94    ) -> Result<(), ServerError>;
95    async fn cur_create(env: u64, tx: u64, cur: u64) -> Result<u64, ServerError>;
96    async fn cur_del(env: u64, tx: u64, cur: u64, flags: u32) -> Result<(), ServerError>;
97    async fn cur_close(env: u64, tx: u64, cur: u64) -> Result<(), ServerError>;
98
99    // Our custom primitives
100    async fn batch_cur_get_full(
101        env: u64,
102        tx: u64,
103        cur: u64,
104        cnt: u64,
105        buffer: u64,
106        op: u32,
107    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ServerError>;
108}
109
110#[derive(Debug)]
111struct LocalTransactionState<K: TransactionKind> {
112    tx: Transaction<K>,
113    cursors: HashMap<u64, Cursor<K>>,
114    next_cur_id: u64,
115}
116
117impl<K: TransactionKind> LocalTransactionState<K> {
118    pub fn next_id(&mut self) -> u64 {
119        while self.cursors.contains_key(&self.next_cur_id) {
120            self.next_cur_id = self.next_cur_id.wrapping_add(1);
121        }
122
123        self.next_cur_id
124    }
125
126    pub fn cur_clone(&mut self, cur: u64) -> Option<u64> {
127        let cur = self.cursors.get_mut(&cur)?;
128        let new_cur = cur.clone();
129        let new_id = self.next_id();
130        self.cursors.insert(new_id, new_cur);
131        Some(new_id)
132    }
133}
134
135#[derive(Debug)]
136struct DatabaseEnvState {
137    rotxs: HashMap<u64, LocalTransactionState<RO>>,
138    rwtxs: HashMap<u64, LocalTransactionState<RW>>,
139    env: Environment,
140    next_tx_id: u64,
141}
142
143impl DatabaseEnvState {
144    fn next_id(&mut self) -> u64 {
145        while self.rotxs.contains_key(&self.next_tx_id) || self.rwtxs.contains_key(&self.next_tx_id)
146        {
147            self.next_tx_id = self.next_tx_id.wrapping_add(1);
148        }
149
150        self.next_tx_id
151    }
152}
153
154#[derive(Debug, Default)]
155pub struct MDBXServerState {
156    envs: HashMap<u64, DatabaseEnvState>,
157    next_env_id: u64,
158}
159
160impl MDBXServerState {
161    fn next_id(&mut self) -> u64 {
162        while self.envs.contains_key(&self.next_env_id) {
163            self.next_env_id = self.next_env_id.wrapping_add(1);
164        }
165        self.next_env_id
166    }
167}
168
169#[derive(Clone, Debug, Error, Serialize, Deserialize)]
170pub enum ServerError {
171    #[error("mdbx error: {0}")]
172    MBDX(crate::error::Error),
173    #[error("no such env")]
174    NOENV,
175    #[error("no such tx")]
176    NOTX,
177    #[error("no such cursor")]
178    NOCURSOR,
179    #[error("incorrect flag")]
180    INCORRECTFLAG,
181    #[error("fail to get absolute path")]
182    NOPATH,
183    #[error("not writable")]
184    NOWRITABLE,
185    #[error("tokio: {0}")]
186    TOKIO(String),
187    #[error("invalid get_full: {0}")]
188    INVALIDGETULL(u32),
189}
190
191impl From<tokio::task::JoinError> for ServerError {
192    fn from(value: tokio::task::JoinError) -> Self {
193        Self::TOKIO(value.to_string())
194    }
195}
196
197impl From<crate::error::Error> for ServerError {
198    fn from(value: crate::error::Error) -> Self {
199        Self::MBDX(value.into())
200    }
201}
202
203#[derive(Debug, Clone)]
204pub struct RemoteMDBXServer {
205    state: Arc<RwLock<MDBXServerState>>,
206}
207
208impl RemoteMDBXServer {
209    pub fn new() -> Self {
210        Self {
211            state: Arc::new(RwLock::new(MDBXServerState::default())),
212        }
213    }
214}
215
216impl RemoteMDBX for RemoteMDBXServer {
217    async fn open_env(
218        self,
219        _context: tarpc::context::Context,
220        path: PathBuf,
221        cfg: RemoteEnvironmentConfig,
222    ) -> Result<u64, ServerError> {
223        let abs_path = std::path::absolute(path).map_err(|_| ServerError::NOPATH)?;
224        let handle = {
225            let builder = EnvironmentBuilder::from(cfg);
226            let env = tokio::task::spawn_blocking(move || builder.open(&abs_path)).await??;
227            let mut lg = self.state.write().await;
228            let handle = lg.next_id();
229
230            lg.envs.insert(
231                handle,
232                DatabaseEnvState {
233                    rotxs: HashMap::new(),
234                    rwtxs: HashMap::new(),
235                    env: env,
236                    next_tx_id: 0,
237                },
238            );
239            handle
240        };
241
242        Ok(handle)
243    }
244
245    async fn env_close(
246        self,
247        _context: tarpc::context::Context,
248        env: u64,
249    ) -> Result<(), ServerError> {
250        let mut lg = self.state.write().await;
251        lg.envs.remove(&env);
252        Ok(())
253    }
254
255    async fn env_stat(
256        self,
257        _context: tarpc::context::Context,
258        env: u64,
259    ) -> Result<Stat, ServerError> {
260        let env = self
261            .state
262            .read()
263            .await
264            .envs
265            .get(&env)
266            .ok_or(ServerError::NOENV)?
267            .env
268            .clone();
269        let ret = env.stat()?;
270
271        Ok(ret)
272    }
273
274    async fn env_info(
275        self,
276        _context: tarpc::context::Context,
277        env: u64,
278    ) -> Result<Info, ServerError> {
279        let env = self
280            .state
281            .read()
282            .await
283            .envs
284            .get(&env)
285            .ok_or(ServerError::NOENV)?
286            .env
287            .clone();
288        let ret = env.info()?;
289
290        Ok(ret)
291    }
292
293    async fn env_sync(
294        self,
295        _context: tarpc::context::Context,
296        env: u64,
297        force: bool,
298    ) -> Result<bool, ServerError> {
299        let env = self
300            .state
301            .read()
302            .await
303            .envs
304            .get(&env)
305            .ok_or(ServerError::NOENV)?
306            .env
307            .clone();
308        // This can block for a very time, though not forever
309        let ret = tokio::task::spawn_blocking(move || env.sync(force)).await??;
310
311        Ok(ret)
312    }
313
314    async fn env_ro_tx(
315        self,
316        _context: tarpc::context::Context,
317        env: u64,
318    ) -> Result<u64, ServerError> {
319        let env_clone = self
320            .state
321            .read()
322            .await
323            .envs
324            .get(&env)
325            .ok_or(ServerError::NOENV)?
326            .env
327            .clone();
328        // This can block forever
329        let tx = tokio::task::spawn_blocking(move || env_clone.begin_ro_txn()).await??;
330
331        // hold write lock now, because no deadlock will happen
332        let mut lg = self.state.write().await;
333        let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
334        let tx_id = env.next_id();
335        env.rotxs.insert(
336            tx_id,
337            LocalTransactionState {
338                tx: tx,
339                cursors: HashMap::new(),
340                next_cur_id: 0,
341            },
342        );
343
344        Ok(tx_id)
345    }
346
347    async fn env_rw_tx(
348        self,
349        _context: tarpc::context::Context,
350        env: u64,
351    ) -> Result<u64, ServerError> {
352        let env_clone = self
353            .state
354            .read()
355            .await
356            .envs
357            .get(&env)
358            .ok_or(ServerError::NOENV)?
359            .env
360            .clone();
361        // This can block forever
362        let tx = tokio::task::spawn_blocking(move || env_clone.begin_rw_txn()).await??;
363
364        // hold write lock now, because no deadlock will happen
365        let mut lg = self.state.write().await;
366        let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
367        let tx_id = env.next_id();
368        env.rwtxs.insert(
369            tx_id,
370            LocalTransactionState {
371                tx: tx,
372                cursors: HashMap::new(),
373                next_cur_id: 0,
374            },
375        );
376        Ok(tx_id)
377    }
378
379    async fn tx_create_db(
380        self,
381        _context: tarpc::context::Context,
382        env: u64,
383        tx: u64,
384        db: Option<String>,
385        flags: u32,
386    ) -> Result<u32, ServerError> {
387        let flags = DatabaseFlags::from_bits(flags).ok_or(ServerError::INCORRECTFLAG)?;
388
389        let lg = self.state.read().await;
390        let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
391
392        let db = if let Some(tx) = env.rwtxs.get(&tx) {
393            let tx = tx.tx.clone();
394            drop(lg);
395            tokio::task::spawn_blocking(move || tx.open_db_with_flags(db.as_deref(), flags))
396                .await??
397        } else if let Some(tx) = env.rotxs.get(&tx) {
398            let tx = tx.tx.clone();
399            drop(lg);
400            if flags.contains(DatabaseFlags::CREATE) {
401                return Err(ServerError::NOWRITABLE);
402            }
403            // This can block? can it?
404            // But anyway opening databases should be not so frequent so overhead should be acceptable.
405            tokio::task::spawn_blocking(move || tx.open_db(db.as_deref())).await??
406        } else {
407            return Err(ServerError::NOTX);
408        };
409
410        Ok(db.dbi())
411    }
412
413    async fn tx_get(
414        self,
415        _context: tarpc::context::Context,
416        env: u64,
417        tx: u64,
418        dbi: u32,
419        key: Vec<u8>,
420    ) -> Result<Option<Vec<u8>>, ServerError> {
421        let lg = self.state.read().await;
422        let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
423
424        let val = if let Some(tx) = env.rwtxs.get(&tx) {
425            let tx = tx.tx.clone();
426            drop(lg);
427            tx.get::<Vec<u8>>(dbi, &key)?
428        } else if let Some(ro) = env.rotxs.get(&tx) {
429            let tx_clone = ro.tx.clone();
430            drop(lg);
431            tx_clone.get::<Vec<u8>>(dbi, &key)?
432        } else {
433            return Err(ServerError::NOTX);
434        };
435        Ok(val)
436    }
437
438    async fn tx_put(
439        self,
440        _context: tarpc::context::Context,
441        env: u64,
442        tx: u64,
443        dbi: u32,
444        key: Vec<u8>,
445        value: Vec<u8>,
446        flags: u32,
447    ) -> Result<(), ServerError> {
448        let flags = WriteFlags::from_bits(flags).ok_or(ServerError::INCORRECTFLAG)?;
449        let lg = self.state.read().await;
450
451        let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
452
453        if let Some(tx) = env.rwtxs.get(&tx) {
454            let tx = tx.tx.clone();
455            drop(lg);
456            tx.put(dbi, &key, &value, flags)?;
457        } else {
458            return Err(ServerError::NOTX);
459        }
460
461        Ok(())
462    }
463
464    async fn tx_del(
465        self,
466        _context: tarpc::context::Context,
467        env: u64,
468        tx: u64,
469        dbi: u32,
470        key: Vec<u8>,
471        value: Option<Vec<u8>>,
472    ) -> Result<bool, ServerError> {
473        let lg = self.state.read().await;
474
475        let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
476
477        let ret = if let Some(tx) = env.rwtxs.get(&tx) {
478            let tx = tx.tx.clone();
479            drop(lg);
480            tx.del(dbi, &key, value.as_ref().map(|t| t.as_slice()))?
481        } else {
482            return Err(ServerError::NOTX);
483        };
484
485        Ok(ret)
486    }
487
488    async fn tx_commit(
489        self,
490        _context: tarpc::context::Context,
491        env: u64,
492        tx: u64,
493    ) -> Result<(bool, CommitLatency), ServerError> {
494        let tx = {
495            let mut lg = self.state.write().await;
496            let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
497            env.rwtxs.remove(&tx).ok_or(ServerError::NOTX)?
498        };
499
500        // This can be slow, wrap it in spawn_blocking
501        Ok(tokio::task::spawn_blocking(move || tx.tx.commit()).await??)
502    }
503
504    async fn tx_abort(
505        self,
506        _context: tarpc::context::Context,
507        env: u64,
508        tx: u64,
509    ) -> Result<(), ServerError> {
510        let (rw, ro) = {
511            let mut lg = self.state.write().await;
512
513            let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
514
515            (env.rwtxs.remove(&tx), env.rotxs.remove(&tx))
516        };
517
518        if let Some(rw) = rw {
519            drop(rw.tx);
520        } else if let Some(ro) = ro {
521            drop(ro.tx);
522        } else {
523            return Err(ServerError::NOTX);
524        }
525
526        Ok(())
527    }
528
529    async fn tx_nested(
530        self,
531        _context: tarpc::context::Context,
532        env: u64,
533        tx: u64,
534    ) -> Result<u64, ServerError> {
535        let mut tx = self
536            .state
537            .read()
538            .await
539            .envs
540            .get(&env)
541            .ok_or(ServerError::NOENV)?
542            .rwtxs
543            .get(&tx)
544            .ok_or(ServerError::NOTX)?
545            .tx
546            .clone();
547
548        // Can this block forever? Anyway, wrap it with spawn_blocking for safety.
549        let new_tx = tokio::task::spawn_blocking(move || tx.begin_nested_txn()).await??;
550
551        let mut lg = self.state.write().await;
552        let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
553        let id = env.next_id();
554        env.rwtxs.insert(
555            id,
556            LocalTransactionState {
557                tx: new_tx,
558                cursors: HashMap::new(),
559                next_cur_id: 0,
560            },
561        );
562
563        Ok(id)
564    }
565
566    async fn tx_db_stat(
567        self,
568        _context: tarpc::context::Context,
569        env: u64,
570        tx: u64,
571        dbi: u32,
572    ) -> Result<Stat, ServerError> {
573        let lg = self.state.read().await;
574
575        let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
576
577        let stat = if let Some(rw) = env.rwtxs.get(&tx) {
578            let tx = rw.tx.clone();
579            drop(lg);
580            tx.db_stat_with_dbi(dbi)?
581        } else if let Some(ro) = env.rotxs.get(&tx) {
582            let tx = ro.tx.clone();
583            drop(lg);
584            tx.db_stat_with_dbi(dbi)?
585        } else {
586            return Err(ServerError::NOTX);
587        };
588
589        Ok(stat)
590    }
591
592    async fn clear_db(
593        self,
594        _context: tarpc::context::Context,
595        env: u64,
596        tx: u64,
597        dbi: u32,
598    ) -> Result<(), ServerError> {
599        let tx = self
600            .state
601            .read()
602            .await
603            .envs
604            .get(&env)
605            .ok_or(ServerError::NOENV)?
606            .rwtxs
607            .get(&tx)
608            .ok_or(ServerError::NOTX)?
609            .tx
610            .clone();
611
612        // This can be slow
613        tokio::task::spawn_blocking(move || tx.clear_db(dbi)).await??;
614        Ok(())
615    }
616
617    async fn tx_ro_cursor(
618        self,
619        _context: tarpc::context::Context,
620        env: u64,
621        tx: u64,
622        dbi: u32,
623    ) -> Result<u64, ServerError> {
624        let tx_clone = self
625            .state
626            .read()
627            .await
628            .envs
629            .get(&env)
630            .ok_or(ServerError::NOENV)?
631            .rotxs
632            .get(&tx)
633            .ok_or(ServerError::NOTX)?
634            .tx
635            .clone();
636        let cur = tokio::task::spawn_blocking(move || tx_clone.cursor_with_dbi(dbi)).await??;
637        let mut lg = self.state.write().await;
638        let tx_mut = lg
639            .envs
640            .get_mut(&env)
641            .ok_or(ServerError::NOENV)?
642            .rotxs
643            .get_mut(&tx)
644            .ok_or(ServerError::NOTX)?;
645
646        let cur_id = tx_mut.next_id();
647        tx_mut.cursors.insert(cur_id, cur);
648        return Ok(cur_id);
649    }
650
651    async fn tx_rw_cursor(
652        self,
653        _context: tarpc::context::Context,
654        env: u64,
655        tx: u64,
656        dbi: u32,
657    ) -> Result<u64, ServerError> {
658        let tx_clone = self
659            .state
660            .read()
661            .await
662            .envs
663            .get(&env)
664            .ok_or(ServerError::NOENV)?
665            .rwtxs
666            .get(&tx)
667            .ok_or(ServerError::NOTX)?
668            .tx
669            .clone();
670        let cur = tokio::task::spawn_blocking(move || tx_clone.cursor_with_dbi(dbi)).await??;
671        let mut lg = self.state.write().await;
672        let tx_mut = lg
673            .envs
674            .get_mut(&env)
675            .ok_or(ServerError::NOENV)?
676            .rwtxs
677            .get_mut(&tx)
678            .ok_or(ServerError::NOTX)?;
679
680        let cur_id = tx_mut.next_id();
681        tx_mut.cursors.insert(cur_id, cur);
682        return Ok(cur_id);
683    }
684
685    async fn cur_create(
686        self,
687        _context: tarpc::context::Context,
688        env: u64,
689        tx: u64,
690        cur: u64,
691    ) -> Result<u64, ServerError> {
692        // Cloning a cursor has an extra overhead of mdbx call, don't wrap them
693        // in a tokio::task::spawn_blocking
694        let mut lg = self.state.write().await;
695
696        let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
697
698        if let Some(tx) = env.rwtxs.get_mut(&tx) {
699            let new_cur = tx.cur_clone(cur).ok_or(ServerError::NOCURSOR)?;
700            return Ok(new_cur);
701        } else if let Some(tx) = env.rotxs.get_mut(&tx) {
702            let new_cur = tx.cur_clone(cur).ok_or(ServerError::NOCURSOR)?;
703            return Ok(new_cur);
704        } else {
705            return Err(ServerError::NOTX);
706        };
707    }
708
709    async fn cur_get(
710        self,
711        _context: tarpc::context::Context,
712        env: u64,
713        tx: u64,
714        cur: u64,
715        key: Option<Vec<u8>>,
716        data: Option<Vec<u8>>,
717        op: u32,
718    ) -> Result<(Option<Vec<u8>>, Vec<u8>, bool), ServerError> {
719        let op = mdbx_remote_sys::MDBX_cursor_op::try_from(op)
720            .map_err(|_| crate::error::Error::DecodeError)?;
721        let lg = self.state.read().await;
722
723        let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
724
725        // tracing::debug!("Cursor get, cur = {}, key = {:?} op = {}", cur, key, op);
726        let val = if let Some(tx) = env.rwtxs.get(&tx) {
727            tx.cursors
728                .get(&cur)
729                .ok_or(ServerError::NOCURSOR)?
730                .get::<Vec<u8>, Vec<u8>>(
731                    key.as_ref().map(|t| t.as_slice()),
732                    data.as_ref().map(|t| t.as_slice()),
733                    op,
734                )?
735        } else if let Some(tx) = env.rotxs.get(&tx) {
736            tx.cursors
737                .get(&cur)
738                .ok_or(ServerError::NOCURSOR)?
739                .get::<Vec<u8>, Vec<u8>>(
740                    key.as_ref().map(|t| t.as_slice()),
741                    data.as_ref().map(|t| t.as_slice()),
742                    op,
743                )?
744        } else {
745            return Err(ServerError::NOTX);
746        };
747        // tracing::debug!("Cursor get down, cur = {}, key = {:?} op = {}", cur, key, op);
748        Ok(val)
749    }
750
751    async fn cur_put(
752        self,
753        _context: tarpc::context::Context,
754        env: u64,
755        tx: u64,
756        cur: u64,
757        key: Vec<u8>,
758        value: Vec<u8>,
759        flags: u32,
760    ) -> Result<(), ServerError> {
761        // tracing::debug!("Cursor put, cur = {}, key = {:?}", cur, key);
762        let op = WriteFlags::from_bits(flags).ok_or(ServerError::INCORRECTFLAG)?;
763        let mut lg = self.state.write().await;
764
765        let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
766
767        if let Some(tx) = env.rwtxs.get_mut(&tx) {
768            tx.cursors
769                .get_mut(&cur)
770                .ok_or(ServerError::NOCURSOR)?
771                .put(&key, &value, op)?;
772        } else {
773            return Err(ServerError::NOTX);
774        }
775
776        // tracing::debug!("Cursor put done, cur = {}, key = {:?}", cur, key);
777        Ok(())
778    }
779
780    async fn cur_del(
781        self,
782        _context: tarpc::context::Context,
783        env: u64,
784        tx: u64,
785        cur: u64,
786        flags: u32,
787    ) -> Result<(), ServerError> {
788        // tracing::info!("cur_del {}", cur);
789        let op = WriteFlags::from_bits(flags).ok_or(ServerError::INCORRECTFLAG)?;
790        let mut lg = self.state.write().await;
791
792        let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
793
794        if let Some(tx) = env.rwtxs.get_mut(&tx) {
795            tx.cursors
796                .get_mut(&cur)
797                .ok_or(ServerError::NOCURSOR)?
798                .del(op)?;
799        } else {
800            return Err(ServerError::NOTX);
801        }
802        // tracing::info!("cur_down {}", cur);
803        Ok(())
804    }
805
806    async fn cur_close(
807        self,
808        _context: tarpc::context::Context,
809        env: u64,
810        tx: u64,
811        cur: u64,
812    ) -> Result<(), ServerError> {
813        let (rw, ro) = {
814            let mut lg = self.state.write().await;
815
816            let env = lg.envs.get_mut(&env).ok_or(ServerError::NOENV)?;
817
818            (
819                env.rwtxs.get_mut(&tx).and_then(|t| t.cursors.remove(&cur)),
820                env.rotxs.get_mut(&tx).and_then(|t| t.cursors.remove(&cur)),
821            )
822        };
823
824        if let Some(rw) = rw {
825            drop(rw);
826        } else if let Some(ro) = ro {
827            drop(ro);
828        } else {
829            return Err(ServerError::NOCURSOR);
830        }
831
832        Ok(())
833    }
834
835    async fn batch_cur_get_full(
836        self,
837        _context: tarpc::context::Context,
838        env: u64,
839        tx: u64,
840        cur: u64,
841        cnt: u64,
842        buffer: u64,
843        op: u32,
844    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ServerError> {
845        let op = mdbx_remote_sys::MDBX_cursor_op::try_from(op)
846            .map_err(|_| crate::error::Error::DecodeError)?;
847
848        if !ALLOWED_GET_FULL_OPS.contains(&op) {
849            return Err(ServerError::INVALIDGETULL(op));
850        }
851        let lg = self.state.read().await;
852
853        let env = lg.envs.get(&env).ok_or(ServerError::NOENV)?;
854
855        // tracing::debug!("Cursor get, cur = {}, key = {:?} op = {}", cur, key, op);
856        if let Some(tx) = env.rwtxs.get(&tx) {
857            let cur = tx.cursors.get(&cur).ok_or(ServerError::NOCURSOR)?;
858
859            return Self::batch_cur_get_full_impl(cur, cnt, buffer, op);
860        } else if let Some(tx) = env.rotxs.get(&tx) {
861            let cur = tx.cursors.get(&cur).ok_or(ServerError::NOCURSOR)?;
862            return Self::batch_cur_get_full_impl(cur, cnt, buffer, op);
863        } else {
864            return Err(ServerError::NOTX);
865        }
866    }
867}
868
869impl RemoteMDBXServer {
870    fn batch_cur_get_full_impl<K: TransactionKind>(
871        cur: &Cursor<K>,
872        cnt: u64,
873        buffer: u64,
874        op: u32,
875    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ServerError> {
876        let mut out = Vec::new();
877        let mut current_size = 0;
878        for _ in 0..cnt {
879            if current_size >= buffer {
880                break;
881            }
882
883            match cur.get::<Vec<u8>, Vec<u8>>(None, None, op) {
884                Ok((k, v, _)) => {
885                    let key = k.ok_or(ServerError::INVALIDGETULL(op))?;
886                    current_size += key.len() as u64 + v.len() as u64;
887                    out.push((key, v));
888                }
889                Err(crate::Error::NoData | crate::Error::NotFound) => return Ok(out),
890                Err(e) => {
891                    return Err(e.into());
892                }
893            }
894        }
895
896        Ok(out)
897    }
898}