stored/
service.rs

1// Storage daemon (stored): microservice frontend for different storage backends
2// used in LNP/BP nodes.
3//
4// Written in 2022 by
5//     Dr. Maxim Orlovsky <orlovsky@lnp-bp.org>
6//
7// Copyright (C) 2022 by LNP/BP Standards Association, Switzerland.
8//
9// You should have received a copy of the MIT License along with this software.
10// If not, see <https://opensource.org/licenses/MIT>.
11
12use std::collections::{BTreeSet, HashMap};
13
14use amplify::Slice32;
15use bitcoin_hashes::Hash;
16use commit_verify::commit_encode::ConsensusCommit;
17use internet2::session::LocalSession;
18use internet2::{
19    CreateUnmarshaller, SendRecvMessage, TypedEnum, Unmarshall, Unmarshaller, ZmqSocketType,
20};
21use microservices::error::BootstrapError;
22use microservices::node::TryService;
23use microservices::rpc::ClientError;
24use microservices::ZMQ_CONTEXT;
25use store_rpc::{CheckUnknownReq, InsertReq, PrimaryKey, Reply, Request, RetrieveReq, StoreReq};
26use storm::{Chunk, ChunkId};
27use strict_encoding::{StrictDecode, StrictEncode};
28
29use crate::{Config, DaemonError, LaunchError, STORED_STORAGE_FILE};
30
31pub fn run(config: Config) -> Result<(), BootstrapError<LaunchError>> {
32    let runtime = Runtime::init(config)?;
33
34    runtime.run_or_panic("stored");
35
36    Ok(())
37}
38
39pub struct Runtime {
40    /// Stored sessions
41    pub(super) session_rpc: LocalSession,
42
43    /// Unmarshaller instance used for parsing RPC request
44    pub(super) unmarshaller: Unmarshaller<Request>,
45
46    pub(super) db: sled::Db,
47
48    pub(super) trees: HashMap<String, sled::Tree>,
49}
50
51impl Runtime {
52    pub fn init(config: Config) -> Result<Self, BootstrapError<LaunchError>> {
53        // debug!("Initializing storage provider {:?}", config.storage_conf());
54        // let storage = storage::FileDriver::with(config.storage_conf())?;
55
56        debug!("Opening RPC API socket {}", config.rpc_endpoint);
57        let session_rpc = LocalSession::connect(
58            ZmqSocketType::Rep,
59            &config.rpc_endpoint,
60            None,
61            None,
62            &ZMQ_CONTEXT,
63        )?;
64
65        let (db, trees) = Self::init_db(&config)?;
66
67        info!("Stored runtime started successfully");
68
69        Ok(Self {
70            session_rpc,
71            unmarshaller: Request::create_unmarshaller(),
72            db,
73            trees,
74        })
75    }
76
77    fn init_db(config: &Config) -> Result<(sled::Db, HashMap<String, sled::Tree>), LaunchError> {
78        let mut db_path = config.data_dir.clone();
79        db_path.push(STORED_STORAGE_FILE);
80        debug!("Opening database at {}", db_path.display());
81        let db = sled::open(db_path)?;
82        let trees = config
83            .databases
84            .iter()
85            .map(|name| db.open_tree(name).map(|tree| (name.clone(), tree)))
86            .collect::<Result<HashMap<_, _>, _>>()?;
87        Ok((db, trees))
88    }
89}
90
91impl TryService for Runtime {
92    type ErrorType = ClientError;
93
94    fn try_run_loop(mut self) -> Result<(), Self::ErrorType> {
95        loop {
96            match self.run() {
97                Ok(_) => debug!("API request processing complete"),
98                Err(err) => {
99                    error!("Error processing API request: {}", err);
100                    Err(err)?;
101                }
102            }
103        }
104    }
105}
106
107impl Runtime {
108    fn run(&mut self) -> Result<(), ClientError> {
109        trace!("Awaiting for ZMQ RPC requests...");
110        let raw = self.session_rpc.recv_raw_message()?;
111        let reply = self.rpc_process(raw).unwrap_or_else(|err| err);
112        trace!("Preparing ZMQ RPC reply: {:?}", reply);
113        let data = reply.serialize();
114        trace!("Sending {} bytes back to the client over ZMQ RPC", data.len());
115        self.session_rpc.send_raw_message(&data)?;
116        Ok(())
117    }
118}
119
120impl Runtime {
121    pub(crate) fn rpc_process(&mut self, raw: Vec<u8>) -> Result<Reply, Reply> {
122        trace!("Got {} bytes over ZMQ RPC", raw.len());
123        let request = (*self.unmarshaller.unmarshall(raw.as_slice())?).clone();
124        debug!("Received ZMQ RPC request #{}: {}", request.get_type(), request);
125        match request {
126            Request::Use(table) => self.use_table(table),
127            Request::Tables => self.list_tables(),
128            Request::Count(table) => self.count(table),
129            Request::Store(StoreReq { table, key, chunk }) => self.store(table, key, chunk),
130            Request::Retrieve(RetrieveReq { table, key }) => self.retrieve(table, key),
131            Request::Insert(InsertReq { table, key, item }) => self.insert(table, key, item),
132            Request::ListIds(table) => self.list_ids(table),
133            Request::CheckUnknown(CheckUnknownReq { table, ids }) => self.filter_ids(table, ids),
134        }
135        .map_err(Reply::from)
136    }
137
138    fn use_table(&mut self, table: String) -> Result<Reply, DaemonError> {
139        let tree = self.db.open_tree(&table)?;
140        self.trees.insert(table, tree);
141        Ok(Reply::Success)
142    }
143
144    fn list_tables(&self) -> Result<Reply, DaemonError> {
145        let tables = self.trees.keys().cloned().collect();
146        Ok(Reply::Tables(tables))
147    }
148
149    fn count(&self, table: String) -> Result<Reply, DaemonError> {
150        let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
151        let count = tree.len();
152        Ok(Reply::Count(count as u64))
153    }
154
155    fn store(
156        &self,
157        table: String,
158        key: impl PrimaryKey,
159        chunk: Chunk,
160    ) -> Result<Reply, DaemonError> {
161        let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
162        let chunk_id = chunk.consensus_commit();
163        tree.insert(key.into_slice32(), chunk.as_ref())?;
164        tree.flush()?;
165        Ok(Reply::ChunkId(chunk_id))
166    }
167
168    fn retrieve(&self, table: String, key: impl PrimaryKey) -> Result<Reply, DaemonError> {
169        let key = key.into_slice32();
170        let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
171        Ok(match tree.get(key)? {
172            None => Reply::KeyAbsent(key),
173            Some(data) => Reply::Chunk(data.as_ref().try_into()?),
174        })
175    }
176
177    fn insert(
178        &self,
179        table: String,
180        key: impl PrimaryKey,
181        item: Slice32,
182    ) -> Result<Reply, DaemonError> {
183        let key = key.into_slice32();
184        let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
185        let data = tree.get(key)?.unwrap_or_default();
186        let mut set = if data.is_empty() {
187            BTreeSet::new()
188        } else {
189            BTreeSet::<Slice32>::strict_deserialize(data)?
190        };
191        set.insert(item);
192        tree.insert(key, set.strict_serialize()?)?;
193        tree.flush()?;
194        Ok(Reply::Success)
195    }
196
197    fn list_ids(&self, table: String) -> Result<Reply, DaemonError> {
198        let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
199        let keys = tree
200            .range::<&[u8], _>(..)
201            .map(|res| match res {
202                Ok((ivec, _)) => Ok(ChunkId::from_slice(&ivec)
203                    .map_err(|_| sled::Error::ReportableBug(s!("non-standard id")))?),
204                Err(e) => Err(e),
205            })
206            .collect::<Result<BTreeSet<_>, sled::Error>>()?;
207        Ok(Reply::Ids(keys))
208    }
209
210    fn filter_ids(&self, table: String, mut ids: BTreeSet<ChunkId>) -> Result<Reply, DaemonError> {
211        let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
212        // TODO: Improve efficiency by restricting the range
213        for res in tree.range::<&[u8], _>(..) {
214            let (ivec, _) = res?;
215            let id = ChunkId::from_slice(&ivec).map_err(|_| {
216                DaemonError::Encoding(strict_encoding::Error::DataIntegrityError(s!(
217                    "invalid chunk id data"
218                )))
219            })?;
220            ids.remove(&id);
221        }
222        Ok(Reply::Ids(ids))
223    }
224}