Skip to main content

dag/tests/
test_dag.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::collections::HashMap;
9use std::ops::Deref;
10use std::ops::DerefMut;
11use std::sync::Arc;
12use std::sync::Mutex;
13
14use futures::StreamExt;
15use futures::TryStreamExt;
16use nonblocking::non_blocking;
17use nonblocking::non_blocking_result;
18use tracing::debug;
19
20use crate::ops::CheckIntegrity;
21use crate::ops::DagAddHeads;
22use crate::ops::DagAlgorithm;
23use crate::ops::DagExportCloneData;
24use crate::ops::DagExportPullData;
25use crate::ops::DagImportCloneData;
26use crate::ops::DagImportPullData;
27use crate::ops::DagPersistent;
28use crate::ops::DagStrip;
29use crate::ops::IdConvert;
30use crate::protocol;
31use crate::protocol::RemoteIdConvertProtocol;
32#[cfg(feature = "render")]
33use crate::render::render_dag;
34use crate::tests::DrawDag;
35use crate::CloneData;
36use crate::Dag;
37use crate::Group;
38use crate::Level;
39use crate::Result;
40use crate::Set;
41use crate::Vertex;
42use crate::VertexListWithOptions;
43
44/// Dag structure for testing purpose.
45pub struct TestDag {
46    pub dag: Dag,
47    pub seg_size: usize,
48    pub dir: tempfile::TempDir,
49    pub output: Arc<Mutex<Vec<String>>>,
50}
51
52impl TestDag {
53    /// Creates a `TestDag` for testing.
54    /// Side effect of the `TestDag` will be removed on drop.
55    pub fn new() -> Self {
56        Self::new_with_segment_size(3)
57    }
58
59    /// Crates a `TestDag` using the given ASCII.
60    ///
61    /// This is just `new`, followed by `drawdag`, with an extra rule that
62    /// comments like "# master: M" at the end can be used to specify master
63    /// heads .
64    pub fn draw(text: &str) -> Self {
65        let mut dag = Self::new();
66        let mut split = text.split("# master:");
67        let text = split.next().unwrap_or("");
68        let master = match split.next() {
69            Some(t) => t.split_whitespace().collect::<Vec<_>>(),
70            None => Vec::new(),
71        };
72        dag.drawdag(text, &master);
73        dag
74    }
75
76    /// Similar to `draw` but creates a lazy client so all vertexes
77    /// in the master group are lazy.
78    pub async fn draw_client(text: &str) -> Self {
79        let server = Self::draw(text);
80        // clone data won't include non-master group.
81        let mut client = server.client_cloned_data().await;
82        tracing::debug!("CLIENT");
83        #[cfg(test)]
84        tracing::debug!("CLIENT: {}", client.dump_state().await);
85        let non_master_heads = {
86            let all = server.dag.all().await.unwrap();
87            let non_master = all.difference(&server.dag.master_group().await.unwrap());
88            let heads = server.dag.heads(non_master).await.unwrap();
89            let iter = heads.iter().await.unwrap();
90            iter.try_collect::<Vec<_>>().await.unwrap()
91        };
92        let heads =
93            VertexListWithOptions::from(non_master_heads).with_desired_group(Group::NON_MASTER);
94        client
95            .dag
96            .add_heads_and_flush(&server.dag.dag_snapshot().unwrap(), &heads)
97            .await
98            .unwrap();
99        client
100    }
101
102    /// Creates a `TestDag` with a specific segment size.
103    pub fn new_with_segment_size(seg_size: usize) -> Self {
104        let dir = tempfile::tempdir().unwrap();
105        let dag = Dag::open(dir.path().join("n")).unwrap();
106        Self {
107            dir,
108            dag,
109            seg_size,
110            output: Default::default(),
111        }
112    }
113
114    /// Reopen the dag. Drop in-memory state including caches.
115    pub fn reopen(&mut self) {
116        let mut dag = Dag::open(self.dir.path().join("n")).unwrap();
117        dag.set_remote_protocol(self.dag.get_remote_protocol());
118        self.dag = dag;
119    }
120
121    /// Add vertexes to the graph. Does not resolve vertexes remotely.
122    pub fn drawdag(&mut self, text: &str, master_heads: &[&str]) {
123        self.drawdag_with_limited_heads(text, master_heads, None);
124    }
125
126    /// Add vertexes to the graph. Async version that might resolve vertexes
127    /// remotely on demand.
128    pub async fn drawdag_async(&mut self, text: &str, master_heads: &[&str]) {
129        // Do not call self.validate to avoid fetching vertexes remotely.
130        self.drawdag_with_limited_heads_async(text, master_heads, None, false)
131            .await
132    }
133
134    /// Add vertexes to the graph.
135    ///
136    /// If `heads` is set, ignore part of the graph. Only consider specified
137    /// heads.
138    pub fn drawdag_with_limited_heads(
139        &mut self,
140        text: &str,
141        master_heads: &[&str],
142        heads: Option<&[&str]>,
143    ) {
144        non_blocking(self.drawdag_with_limited_heads_async(text, master_heads, heads, true))
145            .unwrap()
146    }
147
148    pub async fn drawdag_with_limited_heads_async(
149        &mut self,
150        text: &str,
151        master_heads: &[&str],
152        heads: Option<&[&str]>,
153        validate: bool,
154    ) {
155        let (all_heads, parent_func) = get_heads_and_parents_func_from_ascii(text);
156        let heads = match heads {
157            Some(heads) => heads
158                .iter()
159                .map(|s| Vertex::copy_from(s.as_bytes()))
160                .collect(),
161            None => all_heads,
162        };
163        self.dag.dag.set_new_segment_size(self.seg_size);
164        self.dag
165            .add_heads(&parent_func, &heads.into())
166            .await
167            .unwrap();
168        if validate {
169            self.validate().await;
170        }
171        let problems = self.dag.check_segments().await.unwrap();
172        assert!(
173            problems.is_empty(),
174            "problems after drawdag: {:?}",
175            problems
176        );
177        let master_heads = master_heads
178            .iter()
179            .map(|s| Vertex::copy_from(s.as_bytes()))
180            .collect::<Vec<_>>();
181        let need_flush = !master_heads.is_empty();
182        if need_flush {
183            let heads = VertexListWithOptions::from(master_heads).with_desired_group(Group::MASTER);
184            self.dag.flush(&heads).await.unwrap();
185        }
186        if validate {
187            self.validate().await;
188        }
189        assert_eq!(self.dag.check_segments().await.unwrap(), [] as [String; 0]);
190    }
191
192    /// Add one vertex to the non-master group. `parents` is split by whitespaces.
193    pub async fn add_one_vertex(&mut self, name: &str, parents: &str) {
194        let name = Vertex::copy_from(name.as_bytes());
195        let parents: Vec<Vertex> = parents
196            .split_whitespace()
197            .map(|s| Vertex::copy_from(s.as_bytes()))
198            .collect();
199        let heads =
200            VertexListWithOptions::from(&[name.clone()][..]).with_desired_group(Group::NON_MASTER);
201        self.dag
202            .add_heads(
203                &std::iter::once((name, parents)).collect::<HashMap<Vertex, Vec<Vertex>>>(),
204                &heads,
205            )
206            .await
207            .unwrap();
208    }
209
210    /// Flush space-separated master heads.
211    pub async fn flush(&mut self, master_heads: &str) {
212        let heads: Vec<Vertex> = master_heads
213            .split_whitespace()
214            .map(|v| Vertex::copy_from(v.as_bytes()))
215            .collect();
216        let heads = VertexListWithOptions::from(heads).with_desired_group(Group::MASTER);
217        self.dag.flush(&heads).await.unwrap();
218    }
219
220    /// Replace ASCII with Ids in the graph.
221    pub fn annotate_ascii(&self, text: &str) -> String {
222        self.dag.map.replace(text)
223    }
224
225    /// Render the segments.
226    pub fn render_segments(&self) -> String {
227        format!("{:?}", &self.dag.dag)
228    }
229
230    #[cfg(feature = "render")]
231    /// Render the graph.
232    pub fn render_graph(&self) -> String {
233        render_dag(&self.dag, |v| {
234            Some(
235                non_blocking_result(self.dag.vertex_id(v.clone()))
236                    .unwrap()
237                    .to_string(),
238            )
239        })
240        .unwrap()
241    }
242
243    /// Use this DAG as the "server", return the "client" Dag that has lazy Vertexes.
244    pub async fn client(&self) -> TestDag {
245        let mut client = TestDag::new();
246        client.set_remote(self);
247        client
248    }
249
250    /// Update remote protocol to use the (updated) server graph.
251    pub fn set_remote(&mut self, server_dag: &Self) {
252        let remote = server_dag.remote_protocol(self.output.clone());
253        self.dag.set_remote_protocol(remote);
254    }
255
256    /// Alternative syntax of `set_remote`.
257    pub fn with_remote(mut self, server_dag: &Self) -> Self {
258        self.set_remote(server_dag);
259        self
260    }
261
262    /// Similar to `client`, but also clone the Dag from the server.
263    pub async fn client_cloned_data(&self) -> TestDag {
264        let mut client = self.client().await;
265        let data = self.dag.export_clone_data().await.unwrap();
266        tracing::debug!("clone data: {:?}", &data);
267        client.dag.import_clone_data(data).await.unwrap();
268        client
269    }
270
271    /// Pull from the server Dag using the master fast forward fast path.
272    pub async fn pull_ff_master(
273        &mut self,
274        server: &Self,
275        old_master: impl Into<Set>,
276        new_master: impl Into<Set>,
277    ) -> Result<()> {
278        self.set_remote(server);
279        let old_master = old_master.into();
280        let new_master = new_master.into();
281        let pull_data = server
282            .export_pull_data(old_master.clone(), new_master.clone())
283            .await?;
284        let head_opts = to_head_opts(new_master);
285        self.import_pull_data(pull_data, head_opts).await?;
286        Ok(())
287    }
288
289    /// Generate the "pull data". This is intended to be called from a "server".
290    pub async fn export_pull_data(
291        &self,
292        common: impl Into<Set>,
293        heads: impl Into<Set>,
294    ) -> Result<CloneData<Vertex>> {
295        let missing = self.dag.only(heads.into(), common.into()).await?;
296        let data = self.dag.export_pull_data(&missing).await?;
297        debug!("export_pull_data: {:?}", &data);
298        Ok(data)
299    }
300
301    /// Imports the "pull data". This is intended to be called from a "client".
302    pub async fn import_pull_data(
303        &mut self,
304        pull_data: CloneData<Vertex>,
305        head_opts: impl Into<VertexListWithOptions>,
306    ) -> Result<()> {
307        let head_opts = head_opts.into();
308        self.dag.import_pull_data(pull_data, &head_opts).await?;
309        Ok(())
310    }
311
312    /// Strip space-separated vertexes.
313    pub async fn strip(&mut self, names: &'static str) {
314        let set = Set::from_static_names(names.split(' ').map(|s| s.into()));
315        self.dag.strip(&set).await.unwrap();
316        let problems = self.dag.check_segments().await.unwrap();
317        assert!(problems.is_empty(), "problems after strip: {:?}", problems);
318    }
319
320    /// Remote protocol used to resolve Id <-> Vertex remotely using the test dag
321    /// as the "server".
322    ///
323    /// Logs of the remote access will be written to `output`.
324    pub fn remote_protocol(
325        &self,
326        output: Arc<Mutex<Vec<String>>>,
327    ) -> Arc<dyn RemoteIdConvertProtocol> {
328        let remote = ProtocolMonitor {
329            inner: Box::new(self.dag.try_snapshot().unwrap()),
330            output,
331        };
332        Arc::new(remote)
333    }
334
335    /// Describe segments at the given level and group as a string.
336    pub fn debug_segments(&self, level: Level, group: Group) -> String {
337        let lines =
338            crate::dag::debug_segments_by_level_group(&self.dag.dag, &self.dag.map, level, group);
339        lines
340            .iter()
341            .map(|l| format!("\n        {}", l))
342            .collect::<Vec<String>>()
343            .concat()
344    }
345
346    /// Output of remote protocols since the last call.
347    pub fn output(&self) -> Vec<String> {
348        let mut result = Vec::new();
349        let mut output = self.output.lock().unwrap();
350        std::mem::swap(&mut result, &mut *output);
351        result
352    }
353
354    /// Check that a vertex exists locally.
355    pub fn contains_vertex_locally(&self, name: impl Into<Vertex>) -> bool {
356        non_blocking_result(self.dag.contains_vertex_name_locally(&[name.into()])).unwrap()[0]
357    }
358
359    #[cfg(test)]
360    /// Dump Dag state as a string.
361    pub async fn dump_state(&self) -> String {
362        use crate::iddagstore::tests::dump_store_state;
363        use crate::Id;
364        let iddag = &self.dag.dag;
365        let all = iddag.all().unwrap();
366        let iddag_state = dump_store_state(&iddag.store, &all);
367        let all_str = format!("{:?}", &self.dag.all().await.unwrap());
368        let idmap_state: String = {
369            let all: Vec<Id> = all.iter_asc().collect();
370            let contains = self.dag.contains_vertex_id_locally(&all).await.unwrap();
371            let local_ids: Vec<Id> = all
372                .into_iter()
373                .zip(contains)
374                .filter(|(_, c)| *c)
375                .map(|(i, _)| i)
376                .collect();
377            let local_vertexes = self
378                .dag
379                .vertex_name_batch(&local_ids)
380                .await
381                .unwrap()
382                .into_iter()
383                .collect::<Result<Vec<_>>>()
384                .unwrap();
385            local_ids
386                .into_iter()
387                .zip(local_vertexes)
388                .map(|(i, v)| format!("{:?}->{:?}", i, v))
389                .collect::<Vec<_>>()
390                .join(" ")
391        };
392
393        format!("{}{}\n{}", all_str, iddag_state, idmap_state)
394    }
395
396    #[cfg(test)]
397    /// Dump Dag segments as ASCII string.
398    pub fn dump_segments_ascii(&self) -> String {
399        use std::collections::HashSet;
400
401        use crate::Id;
402        use crate::IdSet;
403        use crate::IdSpan;
404
405        let span_iter = |span: IdSpan| IdSet::from_spans(vec![span]).into_iter().rev();
406        let iddag = &self.dag.dag;
407        let all_ids = iddag.all_ids_in_groups(&Group::ALL).unwrap();
408        let max_level = iddag.max_level().unwrap();
409        let mut output = String::new();
410        for level in 0..=max_level {
411            output = format!("{}\n        Lv{}:", output.trim_end(), level);
412            for span in all_ids.iter_span_asc() {
413                output += " |";
414                let segments = iddag.segments_in_span_ascending(*span, level).unwrap();
415                let segment_ids: HashSet<Id> = segments
416                    .iter()
417                    .flat_map(|s| span_iter(s.span().unwrap()))
418                    .collect();
419                let segment_highs: HashSet<Id> =
420                    segments.iter().map(|s| s.high().unwrap()).collect();
421                for id in span_iter(*span) {
422                    let id_str = format!("{:?}", id);
423                    if segment_ids.contains(&id) {
424                        output += &id_str
425                    } else {
426                        let space = " ".repeat(id_str.len());
427                        output += &space;
428                    };
429                    output.push(
430                        if segment_highs.contains(&id)
431                            || (segment_ids.contains(&(id + 1)) && !segment_ids.contains(&id))
432                        {
433                            '|'
434                        } else {
435                            ' '
436                        },
437                    );
438                }
439            }
440        }
441        output.trim_end().to_string()
442    }
443
444    async fn validate(&self) {
445        // All vertexes should be accessible, and round-trip through IdMap.
446        let mut iter = self.dag.all().await.unwrap().iter().await.unwrap();
447        while let Some(v) = iter.next().await {
448            let v = v.unwrap();
449            let id = self.dag.vertex_id(v.clone()).await.unwrap();
450            let v2 = self.dag.vertex_name(id).await.unwrap();
451            assert_eq!(v, v2);
452        }
453    }
454}
455
456impl Deref for TestDag {
457    type Target = Dag;
458
459    fn deref(&self) -> &Self::Target {
460        &self.dag
461    }
462}
463
464impl DerefMut for TestDag {
465    fn deref_mut(&mut self) -> &mut Self::Target {
466        &mut self.dag
467    }
468}
469
470pub(crate) struct ProtocolMonitor {
471    pub(crate) inner: Box<dyn RemoteIdConvertProtocol>,
472    pub(crate) output: Arc<Mutex<Vec<String>>>,
473}
474
475#[async_trait::async_trait]
476impl RemoteIdConvertProtocol for ProtocolMonitor {
477    async fn resolve_names_to_relative_paths(
478        &self,
479        heads: Vec<Vertex>,
480        names: Vec<Vertex>,
481    ) -> Result<Vec<(protocol::AncestorPath, Vec<Vertex>)>> {
482        let msg = format!("resolve names: {:?}, heads: {:?}", &names, &heads);
483        self.output.lock().unwrap().push(msg);
484        self.inner
485            .resolve_names_to_relative_paths(heads, names)
486            .await
487    }
488
489    async fn resolve_relative_paths_to_names(
490        &self,
491        paths: Vec<protocol::AncestorPath>,
492    ) -> Result<Vec<(protocol::AncestorPath, Vec<Vertex>)>> {
493        let msg = format!("resolve paths: {:?}", &paths);
494        self.output.lock().unwrap().push(msg);
495        self.inner.resolve_relative_paths_to_names(paths).await
496    }
497}
498
499fn get_heads_and_parents_func_from_ascii(text: &str) -> (Vec<Vertex>, DrawDag) {
500    let dag = DrawDag::from(text);
501    let heads = dag.heads();
502    (heads, dag)
503}
504
505#[cfg(test)]
506impl From<&'static str> for VertexListWithOptions {
507    fn from(names: &str) -> Self {
508        let set = Set::from(names);
509        set.into()
510    }
511}
512
513#[cfg(test)]
514impl From<Set> for VertexListWithOptions {
515    fn from(names: Set) -> Self {
516        to_head_opts(names)
517    }
518}
519
520fn to_head_opts(set: Set) -> VertexListWithOptions {
521    use crate::set::SyncSetQuery;
522    let heads_vec = set.iter().unwrap().collect::<Result<Vec<_>>>().unwrap();
523    VertexListWithOptions::from(heads_vec).with_desired_group(Group::MASTER)
524}