1use crate::error::{Error, Result};
4use crate::serial::*;
5use ofilter::SyncStream;
6use rocksdb::DBIteratorWithThreadMode;
7use rocksdb::DB;
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10use std::fmt;
11use std::marker::PhantomData;
12
13pub struct Iter<'a, K, V>
41where
42 K: Serialize + DeserializeOwned,
43 V: Serialize + DeserializeOwned,
44{
45 pub(crate) inner: DBIteratorWithThreadMode<'a, DB>,
46 pub(crate) filter: SyncStream<Vec<u8>>,
47 pub(crate) done: usize,
48 pub(crate) phantom_data: PhantomData<(K, V)>,
49}
50
51impl<K, V> fmt::Display for Iter<'_, K, V>
52where
53 K: Serialize + DeserializeOwned,
54 V: Serialize + DeserializeOwned,
55{
56 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57 write!(f, "{}/?", self.done)
58 }
59}
60
61impl<'a, K, V> Iterator for Iter<'a, K, V>
62where
63 K: Serialize + DeserializeOwned,
64 V: Serialize + DeserializeOwned,
65{
66 type Item = Result<(K, V)>;
67
68 fn next(&mut self) -> Option<Result<(K, V)>> {
69 loop {
70 if let Some(res) = self.inner.next() {
71 match res {
72 Ok(item) => {
73 let (kbuf, vbuf) = item;
74 let kvec: &Vec<u8> = &kbuf.to_vec();
75 if !self.filter.check(kvec) {
76 continue;
77 }
78 return match deserialize_key(kbuf) {
79 Ok(k) => match deserialize_value(vbuf) {
80 Ok(v) => Some(Ok((k, v))),
81 Err(e) => Some(Err(e)),
82 },
83 Err(e) => Some(Err(e)),
84 };
85 }
86 Err(e) => return Some(Err(Error::from(e))),
87 }
88 } else {
89 break;
90 }
91 }
92 None
93 }
94}
95
96pub struct Keys<'a, K, V>
114where
115 K: Serialize + DeserializeOwned,
116 V: Serialize + DeserializeOwned,
117{
118 pub(crate) inner: DBIteratorWithThreadMode<'a, DB>,
119 pub(crate) filter: SyncStream<Vec<u8>>,
120 pub(crate) done: usize,
121 pub(crate) phantom_data: PhantomData<(K, V)>,
122}
123
124impl<K, V> fmt::Display for Keys<'_, K, V>
125where
126 K: Serialize + DeserializeOwned,
127 V: Serialize + DeserializeOwned,
128{
129 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130 write!(f, "{}/?", self.done)
131 }
132}
133
134impl<'a, K, V> Iterator for Keys<'a, K, V>
135where
136 K: Serialize + DeserializeOwned,
137 V: Serialize + DeserializeOwned,
138{
139 type Item = Result<K>;
140
141 fn next(&mut self) -> Option<Result<K>> {
142 loop {
143 if let Some(res) = self.inner.next() {
144 match res {
145 Ok(item) => {
146 let kvec: &Vec<u8> = &item.0.to_vec();
147 if !self.filter.check(kvec) {
148 continue;
149 }
150 return Some(deserialize_key(item.0));
151 }
152 Err(e) => return Some(Err(Error::from(e))),
153 }
154 } else {
155 break;
156 }
157 }
158 None
159 }
160}
161
162pub struct Values<'a, K, V>
180where
181 K: Serialize + DeserializeOwned,
182 V: Serialize + DeserializeOwned,
183{
184 pub(crate) inner: DBIteratorWithThreadMode<'a, DB>,
185 pub(crate) filter: SyncStream<Vec<u8>>,
186 pub(crate) done: usize,
187 pub(crate) phantom_data: PhantomData<(K, V)>,
188}
189
190impl<K, V> fmt::Display for Values<'_, K, V>
191where
192 K: Serialize + DeserializeOwned,
193 V: Serialize + DeserializeOwned,
194{
195 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
196 write!(f, "{}/?", self.done)
197 }
198}
199
200impl<'a, K, V> Iterator for Values<'a, K, V>
201where
202 K: Serialize + DeserializeOwned,
203 V: Serialize + DeserializeOwned,
204{
205 type Item = Result<V>;
206
207 fn next(&mut self) -> Option<Result<V>> {
208 loop {
209 if let Some(res) = self.inner.next() {
210 match res {
211 Ok(item) => {
212 let (kbuf, vbuf) = item;
213 let kvec: &Vec<u8> = &kbuf.to_vec();
214 if !self.filter.check(kvec) {
215 continue;
216 }
217 return Some(deserialize_value(vbuf));
218 }
219 Err(e) => return Some(Err(Error::from(e))),
220 }
221 } else {
222 break;
223 }
224 }
225 None
226 }
227}
228
229pub struct Export<'a, K, V>
251where
252 K: Serialize + DeserializeOwned,
253 V: Serialize + DeserializeOwned,
254{
255 pub(crate) inner: DBIteratorWithThreadMode<'a, DB>,
256 pub(crate) filter: SyncStream<Vec<u8>>,
257 pub(crate) done: usize,
258 pub(crate) phantom_data: PhantomData<(K, V)>,
259}
260
261impl<K, V> fmt::Display for Export<'_, K, V>
262where
263 K: Serialize + DeserializeOwned,
264 V: Serialize + DeserializeOwned,
265{
266 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
267 write!(f, "{}/?", self.done)
268 }
269}
270
271impl<'a, K, V> Export<'a, K, V>
272where
273 K: Serialize + DeserializeOwned,
274 V: Serialize + DeserializeOwned,
275{
276 pub fn try_next(&mut self) -> Result<Option<(K, V)>> {
277 loop {
278 if let Some(res) = self.inner.next() {
279 match res {
280 Ok(item) => {
281 let (kbuf, vbuf) = item;
282 let kvec: &Vec<u8> = &kbuf.to_vec();
283 if !self.filter.check(kvec) {
284 continue;
285 }
286 return match deserialize_key(kbuf) {
287 Ok(k) => match deserialize_value(vbuf) {
288 Ok(v) => Ok(Some((k, v))),
289 Err(e) => Err(e),
290 },
291 Err(e) => Err(e),
292 };
293 }
294 Err(e) => return Err(Error::from(e)),
295 }
296 } else {
297 break;
298 }
299 }
300 Ok(None)
301 }
302}
303
304impl<'a, K, V> Iterator for Export<'a, K, V>
305where
306 K: Serialize + DeserializeOwned,
307 V: Serialize + DeserializeOwned,
308{
309 type Item = (K, V);
310
311 fn next(&mut self) -> Option<(K, V)> {
312 match self.try_next() {
313 Ok(item) => item,
314 Err(e) => panic!("export error: {}", e),
315 }
316 }
317}
318
319pub struct ExportKeys<'a, K, V>
342where
343 K: Serialize + DeserializeOwned,
344 V: Serialize + DeserializeOwned,
345{
346 pub(crate) inner: DBIteratorWithThreadMode<'a, DB>,
347 pub(crate) filter: SyncStream<Vec<u8>>,
348 pub(crate) done: usize,
349 pub(crate) phantom_data: PhantomData<(K, V)>,
350}
351
352impl<K, V> fmt::Display for ExportKeys<'_, K, V>
353where
354 K: Serialize + DeserializeOwned,
355 V: Serialize + DeserializeOwned,
356{
357 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
358 write!(f, "{}/?", self.done)
359 }
360}
361
362impl<'a, K, V> ExportKeys<'a, K, V>
363where
364 K: Serialize + DeserializeOwned,
365 V: Serialize + DeserializeOwned,
366{
367 pub fn try_next(&mut self) -> Result<Option<K>> {
368 loop {
369 if let Some(res) = self.inner.next() {
370 match res {
371 Ok(item) => {
372 let kvec: &Vec<u8> = &item.0.to_vec();
373 if !self.filter.check(kvec) {
374 continue;
375 }
376 return match deserialize_key(item.0) {
377 Ok(k) => Ok(Some(k)),
378 Err(e) => Err(e),
379 };
380 }
381 Err(e) => return Err(Error::from(e)),
382 }
383 } else {
384 break;
385 }
386 }
387 Ok(None)
388 }
389}
390
391impl<'a, K, V> Iterator for ExportKeys<'a, K, V>
392where
393 K: Serialize + DeserializeOwned,
394 V: Serialize + DeserializeOwned,
395{
396 type Item = K;
397
398 fn next(&mut self) -> Option<K> {
399 match self.try_next() {
400 Ok(item) => item,
401 Err(e) => panic!("export error: {}", e),
402 }
403 }
404}
405
406pub struct ExportValues<'a, K, V>
429where
430 K: Serialize + DeserializeOwned,
431 V: Serialize + DeserializeOwned,
432{
433 pub(crate) inner: DBIteratorWithThreadMode<'a, DB>,
434 pub(crate) filter: SyncStream<Vec<u8>>,
435 pub(crate) done: usize,
436 pub(crate) phantom_data: PhantomData<(K, V)>,
437}
438
439impl<K, V> fmt::Display for ExportValues<'_, K, V>
440where
441 K: Serialize + DeserializeOwned,
442 V: Serialize + DeserializeOwned,
443{
444 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
445 write!(f, "{}/?", self.done)
446 }
447}
448
449impl<'a, K, V> ExportValues<'a, K, V>
450where
451 K: Serialize + DeserializeOwned,
452 V: Serialize + DeserializeOwned,
453{
454 pub fn try_next(&mut self) -> Result<Option<V>> {
455 loop {
456 if let Some(res) = self.inner.next() {
457 match res {
458 Ok(item) => {
459 let (kbuf, vbuf) = item;
460 let kvec: &Vec<u8> = &kbuf.to_vec();
461 if !self.filter.check(kvec) {
462 continue;
463 };
464 return match deserialize_value(vbuf) {
465 Ok(v) => Ok(Some(v)),
466 Err(e) => Err(e),
467 };
468 }
469 Err(e) => return Err(Error::from(e)),
470 }
471 } else {
472 break;
473 }
474 }
475 Ok(None)
476 }
477}
478
479impl<'a, K, V> Iterator for ExportValues<'a, K, V>
480where
481 K: Serialize + DeserializeOwned,
482 V: Serialize + DeserializeOwned,
483{
484 type Item = V;
485
486 fn next(&mut self) -> Option<V> {
487 match self.try_next() {
488 Ok(item) => item,
489 Err(e) => panic!("export error: {}", e),
490 }
491 }
492}
493
494pub struct IterCfNames<'a> {
513 pub(crate) idx: isize,
514 pub(crate) other_cfs: &'a Vec<String>,
515}
516
517impl fmt::Display for IterCfNames<'_> {
518 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
519 write!(f, "{}/{}", self.idx + 1, self.other_cfs.len() + 1)
520 }
521}
522
523impl<'a> Iterator for IterCfNames<'a> {
524 type Item = &'a str;
525
526 fn next(&mut self) -> Option<&'a str> {
527 if self.idx < 0 {
528 self.idx = 0;
529 return Some("");
530 }
531 if self.idx as usize >= self.other_cfs.len() {
532 return None;
533 }
534 let cf_name = self.other_cfs[self.idx as usize].as_str();
535 self.idx += 1;
536 Some(cf_name)
537 }
538
539 fn size_hint(&self) -> (usize, Option<usize>) {
540 let remaining = self.other_cfs.len() + 1 - ((self.idx + 1) as usize);
541 (remaining, Some(remaining))
542 }
543}
544
545impl<'a> ExactSizeIterator for IterCfNames<'a> {}