1use std::any::Any;
9use std::fmt;
10use std::sync::Arc;
11use std::sync::Mutex;
12use std::sync::MutexGuard;
13
14use indexmap::IndexSet;
15use nonblocking::non_blocking_result;
16
17use super::hints::Flags;
18use super::id_static::IdStaticSet;
19use super::AsyncNameSetQuery;
20use super::BoxVertexStream;
21use super::Hints;
22use crate::ops::DagAlgorithm;
23use crate::ops::IdConvert;
24use crate::protocol::disable_remote_protocol;
25use crate::Group;
26use crate::Id;
27use crate::IdSet;
28use crate::Result;
29use crate::VertexName;
30
31pub struct IdLazySet {
33 inner: Arc<Mutex<Inner>>,
36 pub map: Arc<dyn IdConvert + Send + Sync>,
37 pub(crate) dag: Arc<dyn DagAlgorithm + Send + Sync>,
38 hints: Hints,
39}
40
41struct Inner {
42 iter: Box<dyn Iterator<Item = Result<Id>> + Send + Sync>,
43 visited: IndexSet<Id>,
44 state: State,
45}
46
47impl Inner {
48 fn load_more(&mut self, n: usize, mut out: Option<&mut Vec<Id>>) -> Result<()> {
49 if matches!(self.state, State::Complete | State::Error) {
50 return Ok(());
51 }
52 for _ in 0..n {
53 match self.iter.next() {
54 Some(Ok(id)) => {
55 if let Some(ref mut out) = out {
56 out.push(id);
57 }
58 self.visited.insert(id);
59 }
60 None => {
61 self.state = State::Complete;
62 break;
63 }
64 Some(Err(err)) => {
65 self.state = State::Error;
66 return Err(err);
67 }
68 }
69 }
70 Ok(())
71 }
72}
73
74#[derive(Copy, Clone, Debug, PartialEq)]
75enum State {
76 Incomplete,
77 Complete,
78 Error,
79}
80
81pub struct Iter {
82 inner: Arc<Mutex<Inner>>,
83 index: usize,
84 map: Arc<dyn IdConvert + Send + Sync>,
85}
86
87impl Iter {
88 fn into_box_stream(self) -> BoxVertexStream {
89 Box::pin(futures::stream::unfold(self, |this| this.next()))
90 }
91
92 async fn next(mut self) -> Option<(Result<VertexName>, Self)> {
93 loop {
94 let state = {
95 let inner = self.inner.lock().unwrap();
96 inner.state
97 };
98 match state {
99 State::Error => break None,
100 State::Complete if self.inner.lock().unwrap().visited.len() <= self.index => {
101 break None;
102 }
103 State::Complete | State::Incomplete => {
104 let opt_id = {
105 let inner = self.inner.lock().unwrap();
106 inner.visited.get_index(self.index).cloned()
107 };
108 match opt_id {
109 Some(id) => {
110 self.index += 1;
111 match self.map.vertex_name(id).await {
112 Err(err) => {
113 self.inner.lock().unwrap().state = State::Error;
114 return Some((Err(err), self));
115 }
116 Ok(vertex) => {
117 break Some((Ok(vertex), self));
118 }
119 }
120 }
121 None => {
122 let more = {
124 let mut inner = self.inner.lock().unwrap();
125 inner.load_more(1, None)
126 };
127 if let Err(err) = more {
128 return Some((Err(err), self));
129 }
130 }
131 }
132 }
133 }
134 }
135 }
136}
137
138struct DebugId {
139 id: Id,
140 name: Option<VertexName>,
141}
142
143impl fmt::Debug for DebugId {
144 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145 if let Some(name) = &self.name {
146 fmt::Debug::fmt(&name, f)?;
147 write!(f, "+{:?}", self.id)?;
148 } else {
149 write!(f, "{:?}", self.id)?;
150 }
151 Ok(())
152 }
153}
154
155impl fmt::Debug for IdLazySet {
156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157 f.write_str("<lazy ")?;
158 let inner = self.inner.lock().unwrap();
159 let limit = f.width().unwrap_or(3);
160 f.debug_list()
161 .entries(inner.visited.iter().take(limit).map(|&id| DebugId {
162 id,
163 name: disable_remote_protocol(|| {
164 non_blocking_result(self.map.vertex_name(id)).ok()
165 }),
166 }))
167 .finish()?;
168 let remaining = inner.visited.len().max(limit) - limit;
169 match (remaining, inner.state) {
170 (0, State::Incomplete) => f.write_str(" + ? more")?,
171 (n, State::Incomplete) => write!(f, "+ {} + ? more", n)?,
172 (0, _) => {}
173 (n, _) => write!(f, " + {} more", n)?,
174 }
175 f.write_str(">")?;
176 Ok(())
177 }
178}
179
180impl IdLazySet {
181 pub fn from_iter_idmap_dag<I>(
182 names: I,
183 map: Arc<dyn IdConvert + Send + Sync>,
184 dag: Arc<dyn DagAlgorithm + Send + Sync>,
185 ) -> Self
186 where
187 I: IntoIterator<Item = Result<Id>> + 'static,
188 <I as IntoIterator>::IntoIter: Send + Sync,
189 {
190 let iter = names.into_iter();
191 let inner = Inner {
192 iter: Box::new(iter),
193 visited: IndexSet::new(),
194 state: State::Incomplete,
195 };
196 let hints = Hints::new_with_idmap_dag(map.clone(), dag.clone());
197 Self {
198 inner: Arc::new(Mutex::new(inner)),
199 map,
200 dag,
201 hints,
202 }
203 }
204
205 pub fn to_static(&self) -> Result<IdStaticSet> {
207 let inner = self.load_all()?;
208 let mut spans = IdSet::empty();
209 for &id in inner.visited.iter() {
210 spans.push(id);
211 }
212 Ok(IdStaticSet::from_spans_idmap_dag(
213 spans,
214 self.map.clone(),
215 self.dag.clone(),
216 ))
217 }
218
219 fn load_all(&self) -> Result<MutexGuard<Inner>> {
220 let mut inner = self.inner.lock().unwrap();
221 inner.load_more(usize::max_value(), None)?;
222 Ok(inner)
223 }
224}
225
226#[async_trait::async_trait]
227impl AsyncNameSetQuery for IdLazySet {
228 async fn iter(&self) -> Result<BoxVertexStream> {
229 let inner = self.inner.clone();
230 let map = self.map.clone();
231 let iter = Iter {
232 inner,
233 index: 0,
234 map,
235 };
236 Ok(iter.into_box_stream())
237 }
238
239 async fn iter_rev(&self) -> Result<BoxVertexStream> {
240 let inner = self.load_all()?;
241 struct State {
242 map: Arc<dyn IdConvert + Send + Sync>,
243 iter: Box<dyn Iterator<Item = Id> + Send>,
244 }
245 let state = State {
246 map: self.map.clone(),
247 iter: Box::new(inner.visited.clone().into_iter().rev()),
248 };
249 async fn next(mut state: State) -> Option<(Result<VertexName>, State)> {
250 match state.iter.next() {
251 None => None,
252 Some(id) => {
253 let result = state.map.vertex_name(id).await;
254 Some((result, state))
255 }
256 }
257 }
258
259 let stream = futures::stream::unfold(state, next);
260 Ok(Box::pin(stream))
261 }
262
263 async fn count(&self) -> Result<usize> {
264 let inner = self.load_all()?;
265 Ok(inner.visited.len())
266 }
267
268 async fn last(&self) -> Result<Option<VertexName>> {
269 let opt_id = {
270 let inner = self.load_all()?;
271 inner.visited.iter().rev().nth(0).cloned()
272 };
273 match opt_id {
274 Some(id) => Ok(Some(self.map.vertex_name(id).await?)),
275 None => Ok(None),
276 }
277 }
278
279 async fn contains(&self, name: &VertexName) -> Result<bool> {
280 let id = match self
281 .map
282 .vertex_id_with_max_group(name, Group::NON_MASTER)
283 .await?
284 {
285 None => {
286 return Ok(false);
287 }
288 Some(id) => id,
289 };
290 let mut inner = self.inner.lock().unwrap();
291 if inner.visited.contains(&id) {
292 return Ok(true);
293 } else {
294 let mut loaded = Vec::new();
295 loop {
296 if let Some(&last_id) = inner.visited.iter().rev().next() {
298 let hints = self.hints();
299 if hints.contains(Flags::ID_DESC) {
300 if last_id < id {
301 return Ok(false);
302 }
303 } else if hints.contains(Flags::ID_ASC) {
304 if last_id > id {
305 return Ok(false);
306 }
307 }
308 }
309 loaded.clear();
310 inner.load_more(1, Some(&mut loaded))?;
311 debug_assert!(loaded.len() <= 1);
312 if loaded.is_empty() {
313 break;
314 }
315 if loaded.first() == Some(&id) {
316 return Ok(true);
317 }
318 }
319 }
320 Ok(false)
321 }
322
323 async fn contains_fast(&self, name: &VertexName) -> Result<Option<bool>> {
324 let id = match self
325 .map
326 .vertex_id_with_max_group(name, Group::NON_MASTER)
327 .await?
328 {
329 None => {
330 return Ok(Some(false));
331 }
332 Some(id) => id,
333 };
334 let inner = self.inner.lock().unwrap();
335 if inner.visited.contains(&id) {
336 return Ok(Some(true));
337 } else if inner.state != State::Incomplete {
338 return Ok(Some(false));
339 }
340 Ok(None)
341 }
342
343 fn as_any(&self) -> &dyn Any {
344 self
345 }
346
347 fn hints(&self) -> &Hints {
348 &self.hints
349 }
350
351 fn id_convert(&self) -> Option<&dyn IdConvert> {
352 Some(self.map.as_ref() as &dyn IdConvert)
353 }
354}
355
356#[cfg(test)]
357pub(crate) mod test_utils {
358 use std::sync::atomic::AtomicU64;
359 use std::sync::atomic::Ordering::AcqRel;
360
361 use super::*;
362 use crate::ops::PrefixLookup;
363 use crate::tests::dummy_dag::DummyDag;
364 use crate::VerLink;
365
366 static STR_ID_MAP_ID: AtomicU64 = AtomicU64::new(0);
367
368 pub(crate) struct StrIdMap {
369 id: String,
370 version: VerLink,
371 }
372
373 impl StrIdMap {
374 pub(crate) fn new() -> Self {
375 Self {
376 id: format!("str:{}", STR_ID_MAP_ID.fetch_add(1, AcqRel)),
377 version: VerLink::new(),
378 }
379 }
380 }
381
382 #[async_trait::async_trait]
383 impl PrefixLookup for StrIdMap {
384 async fn vertexes_by_hex_prefix(&self, _: &[u8], _: usize) -> Result<Vec<VertexName>> {
385 Ok(Vec::new())
387 }
388 }
389 #[async_trait::async_trait]
390 impl IdConvert for StrIdMap {
391 async fn vertex_id(&self, name: VertexName) -> Result<Id> {
392 let slice: [u8; 8] = name.as_ref().try_into().unwrap();
393 let id = u64::from_le(unsafe { std::mem::transmute(slice) });
394 Ok(Id(id))
395 }
396 async fn vertex_id_with_max_group(
397 &self,
398 name: &VertexName,
399 _max_group: Group,
400 ) -> Result<Option<Id>> {
401 if name.as_ref().len() == 8 {
402 let id = self.vertex_id(name.clone()).await?;
403 Ok(Some(id))
404 } else {
405 Ok(None)
406 }
407 }
408 async fn vertex_name(&self, id: Id) -> Result<VertexName> {
409 let buf: [u8; 8] = unsafe { std::mem::transmute(id.0.to_le()) };
410 Ok(VertexName::copy_from(&buf))
411 }
412 async fn contains_vertex_name(&self, name: &VertexName) -> Result<bool> {
413 Ok(name.as_ref().len() == 8)
414 }
415 fn map_id(&self) -> &str {
416 &self.id
417 }
418 fn map_version(&self) -> &VerLink {
419 &self.version
420 }
421 async fn contains_vertex_id_locally(&self, ids: &[Id]) -> Result<Vec<bool>> {
422 Ok(ids.iter().map(|_| true).collect())
423 }
424 async fn contains_vertex_name_locally(&self, names: &[VertexName]) -> Result<Vec<bool>> {
425 Ok(names.iter().map(|name| name.as_ref().len() == 8).collect())
426 }
427 }
428
429 pub fn lazy_set(a: &[u64]) -> IdLazySet {
430 let ids: Vec<Id> = a.iter().map(|i| Id(*i as _)).collect();
431 IdLazySet::from_iter_idmap_dag(
432 ids.into_iter().map(Ok),
433 Arc::new(StrIdMap::new()),
434 Arc::new(DummyDag::new()),
435 )
436 }
437
438 pub fn lazy_set_inherit(a: &[u64], set: &IdLazySet) -> IdLazySet {
439 let ids: Vec<Id> = a.iter().map(|i| Id(*i as _)).collect();
440 IdLazySet::from_iter_idmap_dag(ids.into_iter().map(Ok), set.map.clone(), set.dag.clone())
441 }
442}
443
444#[cfg(all(test, feature = "indexedlog-backend"))]
445#[allow(clippy::redundant_clone)]
446pub(crate) mod tests {
447 use std::collections::HashSet;
448
449 use nonblocking::non_blocking_result as r;
450
451 use super::super::tests::*;
452 use super::super::NameSet;
453 use super::test_utils::*;
454 use super::*;
455
456 #[test]
457 fn test_id_lazy_basic() -> Result<()> {
458 let set = lazy_set(&[0x11, 0x33, 0x22, 0x77, 0x55]);
459 check_invariants(&set)?;
460 assert_eq!(shorten_iter(ni(set.iter())), ["11", "33", "22", "77", "55"]);
461 assert_eq!(
462 shorten_iter(ni(set.iter_rev())),
463 ["55", "77", "22", "33", "11"]
464 );
465 assert!(!nb(set.is_empty())?);
466 assert_eq!(nb(set.count())?, 5);
467 assert_eq!(shorten_name(nb(set.first())?.unwrap()), "11");
468 assert_eq!(shorten_name(nb(set.last())?.unwrap()), "55");
469 Ok(())
470 }
471
472 #[test]
473 fn test_hints_fast_paths() -> Result<()> {
474 let set = lazy_set(&[0x20, 0x50, 0x30, 0x70]);
475
476 set.hints().add_flags(Flags::ID_ASC);
478
479 let v = |i: u64| -> VertexName { r(StrIdMap::new().vertex_name(Id(i))).unwrap() };
480 assert!(nb(set.contains(&v(0x20)))?);
481 assert!(nb(set.contains(&v(0x50)))?);
482 assert!(!nb(set.contains(&v(0x30)))?);
483
484 set.hints().add_flags(Flags::ID_DESC);
485 assert!(nb(set.contains(&v(0x30)))?);
486 assert!(!nb(set.contains(&v(0x70)))?);
487
488 Ok(())
489 }
490
491 #[test]
492 fn test_debug() {
493 let set = lazy_set(&[0]);
494 assert_eq!(format!("{:?}", set), "<lazy [] + ? more>");
495 nb(set.count()).unwrap();
496 assert_eq!(format!("{:?}", set), "<lazy [0000000000000000+0]>");
497
498 let set = lazy_set(&[1, 3, 2]);
499 assert_eq!(format!("{:?}", &set), "<lazy [] + ? more>");
500 let mut iter = ni(set.iter()).unwrap();
501 iter.next();
502 assert_eq!(
503 format!("{:?}", &set),
504 "<lazy [0100000000000000+1] + ? more>"
505 );
506 iter.next();
507 assert_eq!(
508 format!("{:?}", &set),
509 "<lazy [0100000000000000+1, 0300000000000000+3] + ? more>"
510 );
511 iter.next();
512 assert_eq!(format!("{:2.2?}", &set), "<lazy [01+1, 03+3]+ 1 + ? more>");
513 iter.next();
514 assert_eq!(format!("{:1.3?}", &set), "<lazy [010+1] + 2 more>");
515 }
516
517 #[test]
518 fn test_flatten() {
519 let set1 = lazy_set(&[3, 2, 4]);
520 let set2 = lazy_set_inherit(&[3, 7, 6], &set1);
521 let set1 = NameSet::from_query(set1);
522 let set2 = NameSet::from_query(set2);
523
524 let show = |set: NameSet| {
527 [
528 format!("{:5.2?}", r(set.flatten_names()).unwrap()),
529 format!("{:5.2?}", r(set.flatten()).unwrap()),
530 ]
531 };
532
533 assert_eq!(
534 show(set1.clone() | set2.clone()),
535 [
536 "<static [03, 02, 04, 07, 06]>",
537 "<spans [06:07+6:7, 02:04+2:4]>"
538 ]
539 );
540 assert_eq!(
541 show(set1.clone() & set2.clone()),
542 ["<static [03]>", "<spans [03+3]>"]
543 );
544 assert_eq!(
545 show(set1.clone() - set2.clone()),
546 ["<static [02, 04]>", "<spans [04+4, 02+2]>"]
547 );
548 }
549
550 quickcheck::quickcheck! {
551 fn test_id_lazy_quickcheck(a: Vec<u64>) -> bool {
552 let set = lazy_set(&a);
553 check_invariants(&set).unwrap();
554
555 let count = nb(set.count()).unwrap();
556 assert!(count <= a.len());
557
558 let set2: HashSet<_> = a.iter().cloned().collect();
559 assert_eq!(count, set2.len());
560
561 true
562 }
563 }
564}