1use std::collections::{BTreeSet, HashMap};
13
14use amplify::Slice32;
15use bitcoin_hashes::Hash;
16use commit_verify::commit_encode::ConsensusCommit;
17use internet2::session::LocalSession;
18use internet2::{
19 CreateUnmarshaller, SendRecvMessage, TypedEnum, Unmarshall, Unmarshaller, ZmqSocketType,
20};
21use microservices::error::BootstrapError;
22use microservices::node::TryService;
23use microservices::rpc::ClientError;
24use microservices::ZMQ_CONTEXT;
25use store_rpc::{CheckUnknownReq, InsertReq, PrimaryKey, Reply, Request, RetrieveReq, StoreReq};
26use storm::{Chunk, ChunkId};
27use strict_encoding::{StrictDecode, StrictEncode};
28
29use crate::{Config, DaemonError, LaunchError, STORED_STORAGE_FILE};
30
31pub fn run(config: Config) -> Result<(), BootstrapError<LaunchError>> {
32 let runtime = Runtime::init(config)?;
33
34 runtime.run_or_panic("stored");
35
36 Ok(())
37}
38
39pub struct Runtime {
40 pub(super) session_rpc: LocalSession,
42
43 pub(super) unmarshaller: Unmarshaller<Request>,
45
46 pub(super) db: sled::Db,
47
48 pub(super) trees: HashMap<String, sled::Tree>,
49}
50
51impl Runtime {
52 pub fn init(config: Config) -> Result<Self, BootstrapError<LaunchError>> {
53 debug!("Opening RPC API socket {}", config.rpc_endpoint);
57 let session_rpc = LocalSession::connect(
58 ZmqSocketType::Rep,
59 &config.rpc_endpoint,
60 None,
61 None,
62 &ZMQ_CONTEXT,
63 )?;
64
65 let (db, trees) = Self::init_db(&config)?;
66
67 info!("Stored runtime started successfully");
68
69 Ok(Self {
70 session_rpc,
71 unmarshaller: Request::create_unmarshaller(),
72 db,
73 trees,
74 })
75 }
76
77 fn init_db(config: &Config) -> Result<(sled::Db, HashMap<String, sled::Tree>), LaunchError> {
78 let mut db_path = config.data_dir.clone();
79 db_path.push(STORED_STORAGE_FILE);
80 debug!("Opening database at {}", db_path.display());
81 let db = sled::open(db_path)?;
82 let trees = config
83 .databases
84 .iter()
85 .map(|name| db.open_tree(name).map(|tree| (name.clone(), tree)))
86 .collect::<Result<HashMap<_, _>, _>>()?;
87 Ok((db, trees))
88 }
89}
90
91impl TryService for Runtime {
92 type ErrorType = ClientError;
93
94 fn try_run_loop(mut self) -> Result<(), Self::ErrorType> {
95 loop {
96 match self.run() {
97 Ok(_) => debug!("API request processing complete"),
98 Err(err) => {
99 error!("Error processing API request: {}", err);
100 Err(err)?;
101 }
102 }
103 }
104 }
105}
106
107impl Runtime {
108 fn run(&mut self) -> Result<(), ClientError> {
109 trace!("Awaiting for ZMQ RPC requests...");
110 let raw = self.session_rpc.recv_raw_message()?;
111 let reply = self.rpc_process(raw).unwrap_or_else(|err| err);
112 trace!("Preparing ZMQ RPC reply: {:?}", reply);
113 let data = reply.serialize();
114 trace!("Sending {} bytes back to the client over ZMQ RPC", data.len());
115 self.session_rpc.send_raw_message(&data)?;
116 Ok(())
117 }
118}
119
120impl Runtime {
121 pub(crate) fn rpc_process(&mut self, raw: Vec<u8>) -> Result<Reply, Reply> {
122 trace!("Got {} bytes over ZMQ RPC", raw.len());
123 let request = (*self.unmarshaller.unmarshall(raw.as_slice())?).clone();
124 debug!("Received ZMQ RPC request #{}: {}", request.get_type(), request);
125 match request {
126 Request::Use(table) => self.use_table(table),
127 Request::Tables => self.list_tables(),
128 Request::Count(table) => self.count(table),
129 Request::Store(StoreReq { table, key, chunk }) => self.store(table, key, chunk),
130 Request::Retrieve(RetrieveReq { table, key }) => self.retrieve(table, key),
131 Request::Insert(InsertReq { table, key, item }) => self.insert(table, key, item),
132 Request::ListIds(table) => self.list_ids(table),
133 Request::CheckUnknown(CheckUnknownReq { table, ids }) => self.filter_ids(table, ids),
134 }
135 .map_err(Reply::from)
136 }
137
138 fn use_table(&mut self, table: String) -> Result<Reply, DaemonError> {
139 let tree = self.db.open_tree(&table)?;
140 self.trees.insert(table, tree);
141 Ok(Reply::Success)
142 }
143
144 fn list_tables(&self) -> Result<Reply, DaemonError> {
145 let tables = self.trees.keys().cloned().collect();
146 Ok(Reply::Tables(tables))
147 }
148
149 fn count(&self, table: String) -> Result<Reply, DaemonError> {
150 let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
151 let count = tree.len();
152 Ok(Reply::Count(count as u64))
153 }
154
155 fn store(
156 &self,
157 table: String,
158 key: impl PrimaryKey,
159 chunk: Chunk,
160 ) -> Result<Reply, DaemonError> {
161 let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
162 let chunk_id = chunk.consensus_commit();
163 tree.insert(key.into_slice32(), chunk.as_ref())?;
164 tree.flush()?;
165 Ok(Reply::ChunkId(chunk_id))
166 }
167
168 fn retrieve(&self, table: String, key: impl PrimaryKey) -> Result<Reply, DaemonError> {
169 let key = key.into_slice32();
170 let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
171 Ok(match tree.get(key)? {
172 None => Reply::KeyAbsent(key),
173 Some(data) => Reply::Chunk(data.as_ref().try_into()?),
174 })
175 }
176
177 fn insert(
178 &self,
179 table: String,
180 key: impl PrimaryKey,
181 item: Slice32,
182 ) -> Result<Reply, DaemonError> {
183 let key = key.into_slice32();
184 let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
185 let data = tree.get(key)?.unwrap_or_default();
186 let mut set = if data.is_empty() {
187 BTreeSet::new()
188 } else {
189 BTreeSet::<Slice32>::strict_deserialize(data)?
190 };
191 set.insert(item);
192 tree.insert(key, set.strict_serialize()?)?;
193 tree.flush()?;
194 Ok(Reply::Success)
195 }
196
197 fn list_ids(&self, table: String) -> Result<Reply, DaemonError> {
198 let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
199 let keys = tree
200 .range::<&[u8], _>(..)
201 .map(|res| match res {
202 Ok((ivec, _)) => Ok(ChunkId::from_slice(&ivec)
203 .map_err(|_| sled::Error::ReportableBug(s!("non-standard id")))?),
204 Err(e) => Err(e),
205 })
206 .collect::<Result<BTreeSet<_>, sled::Error>>()?;
207 Ok(Reply::Ids(keys))
208 }
209
210 fn filter_ids(&self, table: String, mut ids: BTreeSet<ChunkId>) -> Result<Reply, DaemonError> {
211 let tree = self.trees.get(&table).ok_or(DaemonError::UnknownTable(table))?;
212 for res in tree.range::<&[u8], _>(..) {
214 let (ivec, _) = res?;
215 let id = ChunkId::from_slice(&ivec).map_err(|_| {
216 DaemonError::Encoding(strict_encoding::Error::DataIntegrityError(s!(
217 "invalid chunk id data"
218 )))
219 })?;
220 ids.remove(&id);
221 }
222 Ok(Reply::Ids(ids))
223 }
224}