1use std::collections::HashMap;
9use std::ops::Deref;
10use std::ops::DerefMut;
11use std::sync::Arc;
12use std::sync::Mutex;
13
14use futures::StreamExt;
15use futures::TryStreamExt;
16use nonblocking::non_blocking;
17use nonblocking::non_blocking_result;
18use tracing::debug;
19
20use crate::ops::CheckIntegrity;
21use crate::ops::DagAddHeads;
22use crate::ops::DagAlgorithm;
23use crate::ops::DagExportCloneData;
24use crate::ops::DagExportPullData;
25use crate::ops::DagImportCloneData;
26use crate::ops::DagImportPullData;
27use crate::ops::DagPersistent;
28use crate::ops::DagStrip;
29use crate::ops::IdConvert;
30use crate::protocol;
31use crate::protocol::RemoteIdConvertProtocol;
32#[cfg(feature = "render")]
33use crate::render::render_dag;
34use crate::tests::DrawDag;
35use crate::CloneData;
36use crate::Dag;
37use crate::Group;
38use crate::Level;
39use crate::Result;
40use crate::Set;
41use crate::Vertex;
42use crate::VertexListWithOptions;
43
44pub struct TestDag {
46 pub dag: Dag,
47 pub seg_size: usize,
48 pub dir: tempfile::TempDir,
49 pub output: Arc<Mutex<Vec<String>>>,
50}
51
52impl TestDag {
53 pub fn new() -> Self {
56 Self::new_with_segment_size(3)
57 }
58
59 pub fn draw(text: &str) -> Self {
65 let mut dag = Self::new();
66 let mut split = text.split("# master:");
67 let text = split.next().unwrap_or("");
68 let master = match split.next() {
69 Some(t) => t.split_whitespace().collect::<Vec<_>>(),
70 None => Vec::new(),
71 };
72 dag.drawdag(text, &master);
73 dag
74 }
75
76 pub async fn draw_client(text: &str) -> Self {
79 let server = Self::draw(text);
80 let mut client = server.client_cloned_data().await;
82 tracing::debug!("CLIENT");
83 #[cfg(test)]
84 tracing::debug!("CLIENT: {}", client.dump_state().await);
85 let non_master_heads = {
86 let all = server.dag.all().await.unwrap();
87 let non_master = all.difference(&server.dag.master_group().await.unwrap());
88 let heads = server.dag.heads(non_master).await.unwrap();
89 let iter = heads.iter().await.unwrap();
90 iter.try_collect::<Vec<_>>().await.unwrap()
91 };
92 let heads =
93 VertexListWithOptions::from(non_master_heads).with_desired_group(Group::NON_MASTER);
94 client
95 .dag
96 .add_heads_and_flush(&server.dag.dag_snapshot().unwrap(), &heads)
97 .await
98 .unwrap();
99 client
100 }
101
102 pub fn new_with_segment_size(seg_size: usize) -> Self {
104 let dir = tempfile::tempdir().unwrap();
105 let dag = Dag::open(dir.path().join("n")).unwrap();
106 Self {
107 dir,
108 dag,
109 seg_size,
110 output: Default::default(),
111 }
112 }
113
114 pub fn reopen(&mut self) {
116 let mut dag = Dag::open(self.dir.path().join("n")).unwrap();
117 dag.set_remote_protocol(self.dag.get_remote_protocol());
118 self.dag = dag;
119 }
120
121 pub fn drawdag(&mut self, text: &str, master_heads: &[&str]) {
123 self.drawdag_with_limited_heads(text, master_heads, None);
124 }
125
126 pub async fn drawdag_async(&mut self, text: &str, master_heads: &[&str]) {
129 self.drawdag_with_limited_heads_async(text, master_heads, None, false)
131 .await
132 }
133
134 pub fn drawdag_with_limited_heads(
139 &mut self,
140 text: &str,
141 master_heads: &[&str],
142 heads: Option<&[&str]>,
143 ) {
144 non_blocking(self.drawdag_with_limited_heads_async(text, master_heads, heads, true))
145 .unwrap()
146 }
147
148 pub async fn drawdag_with_limited_heads_async(
149 &mut self,
150 text: &str,
151 master_heads: &[&str],
152 heads: Option<&[&str]>,
153 validate: bool,
154 ) {
155 let (all_heads, parent_func) = get_heads_and_parents_func_from_ascii(text);
156 let heads = match heads {
157 Some(heads) => heads
158 .iter()
159 .map(|s| Vertex::copy_from(s.as_bytes()))
160 .collect(),
161 None => all_heads,
162 };
163 self.dag.dag.set_new_segment_size(self.seg_size);
164 self.dag
165 .add_heads(&parent_func, &heads.into())
166 .await
167 .unwrap();
168 if validate {
169 self.validate().await;
170 }
171 let problems = self.dag.check_segments().await.unwrap();
172 assert!(
173 problems.is_empty(),
174 "problems after drawdag: {:?}",
175 problems
176 );
177 let master_heads = master_heads
178 .iter()
179 .map(|s| Vertex::copy_from(s.as_bytes()))
180 .collect::<Vec<_>>();
181 let need_flush = !master_heads.is_empty();
182 if need_flush {
183 let heads = VertexListWithOptions::from(master_heads).with_desired_group(Group::MASTER);
184 self.dag.flush(&heads).await.unwrap();
185 }
186 if validate {
187 self.validate().await;
188 }
189 assert_eq!(self.dag.check_segments().await.unwrap(), [] as [String; 0]);
190 }
191
192 pub async fn add_one_vertex(&mut self, name: &str, parents: &str) {
194 let name = Vertex::copy_from(name.as_bytes());
195 let parents: Vec<Vertex> = parents
196 .split_whitespace()
197 .map(|s| Vertex::copy_from(s.as_bytes()))
198 .collect();
199 let heads =
200 VertexListWithOptions::from(&[name.clone()][..]).with_desired_group(Group::NON_MASTER);
201 self.dag
202 .add_heads(
203 &std::iter::once((name, parents)).collect::<HashMap<Vertex, Vec<Vertex>>>(),
204 &heads,
205 )
206 .await
207 .unwrap();
208 }
209
210 pub async fn flush(&mut self, master_heads: &str) {
212 let heads: Vec<Vertex> = master_heads
213 .split_whitespace()
214 .map(|v| Vertex::copy_from(v.as_bytes()))
215 .collect();
216 let heads = VertexListWithOptions::from(heads).with_desired_group(Group::MASTER);
217 self.dag.flush(&heads).await.unwrap();
218 }
219
220 pub fn annotate_ascii(&self, text: &str) -> String {
222 self.dag.map.replace(text)
223 }
224
225 pub fn render_segments(&self) -> String {
227 format!("{:?}", &self.dag.dag)
228 }
229
230 #[cfg(feature = "render")]
231 pub fn render_graph(&self) -> String {
233 render_dag(&self.dag, |v| {
234 Some(
235 non_blocking_result(self.dag.vertex_id(v.clone()))
236 .unwrap()
237 .to_string(),
238 )
239 })
240 .unwrap()
241 }
242
243 pub async fn client(&self) -> TestDag {
245 let mut client = TestDag::new();
246 client.set_remote(self);
247 client
248 }
249
250 pub fn set_remote(&mut self, server_dag: &Self) {
252 let remote = server_dag.remote_protocol(self.output.clone());
253 self.dag.set_remote_protocol(remote);
254 }
255
256 pub fn with_remote(mut self, server_dag: &Self) -> Self {
258 self.set_remote(server_dag);
259 self
260 }
261
262 pub async fn client_cloned_data(&self) -> TestDag {
264 let mut client = self.client().await;
265 let data = self.dag.export_clone_data().await.unwrap();
266 tracing::debug!("clone data: {:?}", &data);
267 client.dag.import_clone_data(data).await.unwrap();
268 client
269 }
270
271 pub async fn pull_ff_master(
273 &mut self,
274 server: &Self,
275 old_master: impl Into<Set>,
276 new_master: impl Into<Set>,
277 ) -> Result<()> {
278 self.set_remote(server);
279 let old_master = old_master.into();
280 let new_master = new_master.into();
281 let pull_data = server
282 .export_pull_data(old_master.clone(), new_master.clone())
283 .await?;
284 let head_opts = to_head_opts(new_master);
285 self.import_pull_data(pull_data, head_opts).await?;
286 Ok(())
287 }
288
289 pub async fn export_pull_data(
291 &self,
292 common: impl Into<Set>,
293 heads: impl Into<Set>,
294 ) -> Result<CloneData<Vertex>> {
295 let missing = self.dag.only(heads.into(), common.into()).await?;
296 let data = self.dag.export_pull_data(&missing).await?;
297 debug!("export_pull_data: {:?}", &data);
298 Ok(data)
299 }
300
301 pub async fn import_pull_data(
303 &mut self,
304 pull_data: CloneData<Vertex>,
305 head_opts: impl Into<VertexListWithOptions>,
306 ) -> Result<()> {
307 let head_opts = head_opts.into();
308 self.dag.import_pull_data(pull_data, &head_opts).await?;
309 Ok(())
310 }
311
312 pub async fn strip(&mut self, names: &'static str) {
314 let set = Set::from_static_names(names.split(' ').map(|s| s.into()));
315 self.dag.strip(&set).await.unwrap();
316 let problems = self.dag.check_segments().await.unwrap();
317 assert!(problems.is_empty(), "problems after strip: {:?}", problems);
318 }
319
320 pub fn remote_protocol(
325 &self,
326 output: Arc<Mutex<Vec<String>>>,
327 ) -> Arc<dyn RemoteIdConvertProtocol> {
328 let remote = ProtocolMonitor {
329 inner: Box::new(self.dag.try_snapshot().unwrap()),
330 output,
331 };
332 Arc::new(remote)
333 }
334
335 pub fn debug_segments(&self, level: Level, group: Group) -> String {
337 let lines =
338 crate::dag::debug_segments_by_level_group(&self.dag.dag, &self.dag.map, level, group);
339 lines
340 .iter()
341 .map(|l| format!("\n {}", l))
342 .collect::<Vec<String>>()
343 .concat()
344 }
345
346 pub fn output(&self) -> Vec<String> {
348 let mut result = Vec::new();
349 let mut output = self.output.lock().unwrap();
350 std::mem::swap(&mut result, &mut *output);
351 result
352 }
353
354 pub fn contains_vertex_locally(&self, name: impl Into<Vertex>) -> bool {
356 non_blocking_result(self.dag.contains_vertex_name_locally(&[name.into()])).unwrap()[0]
357 }
358
359 #[cfg(test)]
360 pub async fn dump_state(&self) -> String {
362 use crate::iddagstore::tests::dump_store_state;
363 use crate::Id;
364 let iddag = &self.dag.dag;
365 let all = iddag.all().unwrap();
366 let iddag_state = dump_store_state(&iddag.store, &all);
367 let all_str = format!("{:?}", &self.dag.all().await.unwrap());
368 let idmap_state: String = {
369 let all: Vec<Id> = all.iter_asc().collect();
370 let contains = self.dag.contains_vertex_id_locally(&all).await.unwrap();
371 let local_ids: Vec<Id> = all
372 .into_iter()
373 .zip(contains)
374 .filter(|(_, c)| *c)
375 .map(|(i, _)| i)
376 .collect();
377 let local_vertexes = self
378 .dag
379 .vertex_name_batch(&local_ids)
380 .await
381 .unwrap()
382 .into_iter()
383 .collect::<Result<Vec<_>>>()
384 .unwrap();
385 local_ids
386 .into_iter()
387 .zip(local_vertexes)
388 .map(|(i, v)| format!("{:?}->{:?}", i, v))
389 .collect::<Vec<_>>()
390 .join(" ")
391 };
392
393 format!("{}{}\n{}", all_str, iddag_state, idmap_state)
394 }
395
396 #[cfg(test)]
397 pub fn dump_segments_ascii(&self) -> String {
399 use std::collections::HashSet;
400
401 use crate::Id;
402 use crate::IdSet;
403 use crate::IdSpan;
404
405 let span_iter = |span: IdSpan| IdSet::from_spans(vec![span]).into_iter().rev();
406 let iddag = &self.dag.dag;
407 let all_ids = iddag.all_ids_in_groups(&Group::ALL).unwrap();
408 let max_level = iddag.max_level().unwrap();
409 let mut output = String::new();
410 for level in 0..=max_level {
411 output = format!("{}\n Lv{}:", output.trim_end(), level);
412 for span in all_ids.iter_span_asc() {
413 output += " |";
414 let segments = iddag.segments_in_span_ascending(*span, level).unwrap();
415 let segment_ids: HashSet<Id> = segments
416 .iter()
417 .flat_map(|s| span_iter(s.span().unwrap()))
418 .collect();
419 let segment_highs: HashSet<Id> =
420 segments.iter().map(|s| s.high().unwrap()).collect();
421 for id in span_iter(*span) {
422 let id_str = format!("{:?}", id);
423 if segment_ids.contains(&id) {
424 output += &id_str
425 } else {
426 let space = " ".repeat(id_str.len());
427 output += &space;
428 };
429 output.push(
430 if segment_highs.contains(&id)
431 || (segment_ids.contains(&(id + 1)) && !segment_ids.contains(&id))
432 {
433 '|'
434 } else {
435 ' '
436 },
437 );
438 }
439 }
440 }
441 output.trim_end().to_string()
442 }
443
444 async fn validate(&self) {
445 let mut iter = self.dag.all().await.unwrap().iter().await.unwrap();
447 while let Some(v) = iter.next().await {
448 let v = v.unwrap();
449 let id = self.dag.vertex_id(v.clone()).await.unwrap();
450 let v2 = self.dag.vertex_name(id).await.unwrap();
451 assert_eq!(v, v2);
452 }
453 }
454}
455
456impl Deref for TestDag {
457 type Target = Dag;
458
459 fn deref(&self) -> &Self::Target {
460 &self.dag
461 }
462}
463
464impl DerefMut for TestDag {
465 fn deref_mut(&mut self) -> &mut Self::Target {
466 &mut self.dag
467 }
468}
469
470pub(crate) struct ProtocolMonitor {
471 pub(crate) inner: Box<dyn RemoteIdConvertProtocol>,
472 pub(crate) output: Arc<Mutex<Vec<String>>>,
473}
474
475#[async_trait::async_trait]
476impl RemoteIdConvertProtocol for ProtocolMonitor {
477 async fn resolve_names_to_relative_paths(
478 &self,
479 heads: Vec<Vertex>,
480 names: Vec<Vertex>,
481 ) -> Result<Vec<(protocol::AncestorPath, Vec<Vertex>)>> {
482 let msg = format!("resolve names: {:?}, heads: {:?}", &names, &heads);
483 self.output.lock().unwrap().push(msg);
484 self.inner
485 .resolve_names_to_relative_paths(heads, names)
486 .await
487 }
488
489 async fn resolve_relative_paths_to_names(
490 &self,
491 paths: Vec<protocol::AncestorPath>,
492 ) -> Result<Vec<(protocol::AncestorPath, Vec<Vertex>)>> {
493 let msg = format!("resolve paths: {:?}", &paths);
494 self.output.lock().unwrap().push(msg);
495 self.inner.resolve_relative_paths_to_names(paths).await
496 }
497}
498
499fn get_heads_and_parents_func_from_ascii(text: &str) -> (Vec<Vertex>, DrawDag) {
500 let dag = DrawDag::from(text);
501 let heads = dag.heads();
502 (heads, dag)
503}
504
505#[cfg(test)]
506impl From<&'static str> for VertexListWithOptions {
507 fn from(names: &str) -> Self {
508 let set = Set::from(names);
509 set.into()
510 }
511}
512
513#[cfg(test)]
514impl From<Set> for VertexListWithOptions {
515 fn from(names: Set) -> Self {
516 to_head_opts(names)
517 }
518}
519
520fn to_head_opts(set: Set) -> VertexListWithOptions {
521 use crate::set::SyncSetQuery;
522 let heads_vec = set.iter().unwrap().collect::<Result<Vec<_>>>().unwrap();
523 VertexListWithOptions::from(heads_vec).with_desired_group(Group::MASTER)
524}