1use std::{
2 collections::{HashMap, HashSet},
3 convert::TryInto,
4 fs,
5 io::Read,
6 path::{Path as StdPath, PathBuf},
7 str,
8};
9
10use gen_core::{HashId, Strand, config::Workspace, is_terminal, traits::Capnp};
11use itertools::Itertools;
12use rusqlite::{
13 session::{ChangesetItem, ChangesetIter},
14 types::FromSql,
15};
16use serde::{Deserialize, Serialize};
17
18use crate::{
19 accession::{Accession, AccessionEdge, AccessionEdgeData, AccessionPath},
20 annotations::{
21 Annotation, AnnotationError, AnnotationGroup, AnnotationGroupError, AnnotationGroupSample,
22 },
23 block_group::{BlockGroup, NewBlockGroup},
24 block_group_edge::{BlockGroupEdge, BlockGroupEdgeData},
25 collection::Collection,
26 db::GraphConnection,
27 edge::{Edge, EdgeData},
28 errors::ChangesetError,
29 gen_models_capnp::{changeset_models, database_changeset},
30 node::Node,
31 operations::Operation,
32 path::Path,
33 path_edge::PathEdge,
34 sample::Sample,
35 sample_lineage::SampleLineage,
36 sequence::{NewSequence, Sequence},
37 session_operations::DependencyModels,
38 traits::Query,
39};
40
41#[derive(Debug, Deserialize, Serialize, PartialEq)]
42pub struct DatabaseChangeset {
43 pub db_path: String,
44 pub changes: ChangesetModels,
45}
46
47impl DatabaseChangeset {
48 pub fn get_db_path(path: &StdPath) -> String {
49 use capnp::serialize_packed;
50
51 let file = fs::File::open(path).unwrap();
52 let mut reader = std::io::BufReader::new(file);
53
54 let message_reader =
55 serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new())
56 .unwrap();
57 let root = message_reader
58 .get_root::<database_changeset::Reader>()
59 .unwrap();
60 root.get_db_path().unwrap().to_string().unwrap()
61 }
62}
63
64impl<'a> Capnp<'a> for DatabaseChangeset {
65 type Builder = database_changeset::Builder<'a>;
66 type Reader = database_changeset::Reader<'a>;
67
68 fn write_capnp(&self, builder: &mut Self::Builder) {
69 builder.set_db_path(&self.db_path);
70 let mut changeset_builder = builder.reborrow().init_changes();
71 self.changes.write_capnp(&mut changeset_builder);
72 }
73
74 fn read_capnp(reader: Self::Reader) -> Self {
75 let db_path = reader.get_db_path().unwrap().to_string().unwrap();
76 let changeset_reader = reader.get_changes().unwrap();
77 let changes = ChangesetModels::read_capnp(changeset_reader);
78 DatabaseChangeset { db_path, changes }
79 }
80}
81
82#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
83pub struct ChangesetModels {
84 pub collections: Vec<crate::collection::Collection>,
85 pub samples: Vec<crate::sample::Sample>,
86 pub sample_lineages: Vec<SampleLineage>,
87 pub sequences: Vec<Sequence>,
88 pub block_groups: Vec<BlockGroup>,
89 pub nodes: Vec<Node>,
90 pub edges: Vec<Edge>,
91 pub block_group_edges: Vec<crate::block_group_edge::BlockGroupEdge>,
92 pub paths: Vec<Path>,
93 pub path_edges: Vec<PathEdge>,
94 pub accessions: Vec<Accession>,
95 pub accession_edges: Vec<AccessionEdge>,
96 pub accession_paths: Vec<AccessionPath>,
97 pub annotation_groups: Vec<AnnotationGroup>,
98 pub annotations: Vec<Annotation>,
99 pub annotation_group_samples: Vec<AnnotationGroupSample>,
100}
101
102impl<'a> Capnp<'a> for ChangesetModels {
103 type Builder = changeset_models::Builder<'a>;
104 type Reader = changeset_models::Reader<'a>;
105
106 fn write_capnp(&self, builder: &mut Self::Builder) {
107 let mut collections_builder = builder
109 .reborrow()
110 .init_collections(self.collections.len() as u32);
111 for (i, collection) in self.collections.iter().enumerate() {
112 let mut collection_builder = collections_builder.reborrow().get(i as u32);
113 collection.write_capnp(&mut collection_builder);
114 }
115
116 let mut samples_builder = builder.reborrow().init_samples(self.samples.len() as u32);
118 for (i, sample) in self.samples.iter().enumerate() {
119 let mut sample_builder = samples_builder.reborrow().get(i as u32);
120 sample.write_capnp(&mut sample_builder);
121 }
122
123 let mut sample_lineages_builder = builder
124 .reborrow()
125 .init_sample_lineages(self.sample_lineages.len() as u32);
126 for (i, sample_lineage) in self.sample_lineages.iter().enumerate() {
127 let mut sample_lineage_builder = sample_lineages_builder.reborrow().get(i as u32);
128 sample_lineage.write_capnp(&mut sample_lineage_builder);
129 }
130
131 let mut sequences_builder = builder
133 .reborrow()
134 .init_sequences(self.sequences.len() as u32);
135 for (i, sequence) in self.sequences.iter().enumerate() {
136 let mut sequence_builder = sequences_builder.reborrow().get(i as u32);
137 sequence.write_capnp(&mut sequence_builder);
138 }
139
140 let mut block_groups_builder = builder
142 .reborrow()
143 .init_block_groups(self.block_groups.len() as u32);
144 for (i, block_group) in self.block_groups.iter().enumerate() {
145 let mut block_group_builder = block_groups_builder.reborrow().get(i as u32);
146 block_group.write_capnp(&mut block_group_builder);
147 }
148
149 let mut nodes_builder = builder.reborrow().init_nodes(self.nodes.len() as u32);
151 for (i, node) in self.nodes.iter().enumerate() {
152 let mut node_builder = nodes_builder.reborrow().get(i as u32);
153 node.write_capnp(&mut node_builder);
154 }
155
156 let mut edges_builder = builder.reborrow().init_edges(self.edges.len() as u32);
158 for (i, edge) in self.edges.iter().enumerate() {
159 let mut edge_builder = edges_builder.reborrow().get(i as u32);
160 edge.write_capnp(&mut edge_builder);
161 }
162
163 let mut block_group_edges_builder = builder
165 .reborrow()
166 .init_block_group_edges(self.block_group_edges.len() as u32);
167 for (i, block_group_edge) in self.block_group_edges.iter().enumerate() {
168 let mut block_group_edge_builder = block_group_edges_builder.reborrow().get(i as u32);
169 block_group_edge.write_capnp(&mut block_group_edge_builder);
170 }
171
172 let mut paths_builder = builder.reborrow().init_paths(self.paths.len() as u32);
174 for (i, path) in self.paths.iter().enumerate() {
175 let mut path_builder = paths_builder.reborrow().get(i as u32);
176 path.write_capnp(&mut path_builder);
177 }
178
179 let mut path_edges_builder = builder
181 .reborrow()
182 .init_path_edges(self.path_edges.len() as u32);
183 for (i, path_edge) in self.path_edges.iter().enumerate() {
184 let mut path_edge_builder = path_edges_builder.reborrow().get(i as u32);
185 path_edge.write_capnp(&mut path_edge_builder);
186 }
187
188 let mut accessions_builder = builder
190 .reborrow()
191 .init_accessions(self.accessions.len() as u32);
192 for (i, accession) in self.accessions.iter().enumerate() {
193 let mut accession_builder = accessions_builder.reborrow().get(i as u32);
194 accession.write_capnp(&mut accession_builder);
195 }
196
197 let mut accession_edges_builder = builder
199 .reborrow()
200 .init_accession_edges(self.accession_edges.len() as u32);
201 for (i, accession_edge) in self.accession_edges.iter().enumerate() {
202 let mut accession_edge_builder = accession_edges_builder.reborrow().get(i as u32);
203 accession_edge.write_capnp(&mut accession_edge_builder);
204 }
205
206 let mut accession_paths_builder = builder
208 .reborrow()
209 .init_accession_paths(self.accession_paths.len() as u32);
210 for (i, accession_path) in self.accession_paths.iter().enumerate() {
211 let mut accession_path_builder = accession_paths_builder.reborrow().get(i as u32);
212 accession_path.write_capnp(&mut accession_path_builder);
213 }
214
215 let mut annotation_groups_builder = builder
217 .reborrow()
218 .init_annotation_groups(self.annotation_groups.len() as u32);
219 for (i, annotation_group) in self.annotation_groups.iter().enumerate() {
220 let mut annotation_group_builder = annotation_groups_builder.reborrow().get(i as u32);
221 annotation_group.write_capnp(&mut annotation_group_builder);
222 }
223
224 let mut annotations_builder = builder
226 .reborrow()
227 .init_annotations(self.annotations.len() as u32);
228 for (i, annotation) in self.annotations.iter().enumerate() {
229 let mut annotation_builder = annotations_builder.reborrow().get(i as u32);
230 annotation.write_capnp(&mut annotation_builder);
231 }
232
233 let mut annotation_group_samples_builder = builder
235 .reborrow()
236 .init_annotation_group_samples(self.annotation_group_samples.len() as u32);
237 for (i, annotation_group_sample) in self.annotation_group_samples.iter().enumerate() {
238 let mut annotation_group_sample_builder =
239 annotation_group_samples_builder.reborrow().get(i as u32);
240 annotation_group_sample.write_capnp(&mut annotation_group_sample_builder);
241 }
242 }
243
244 fn read_capnp(reader: Self::Reader) -> Self {
245 let collections_reader = reader.get_collections().unwrap();
247 let mut collections = Vec::new();
248 for collection_reader in collections_reader.iter() {
249 collections.push(crate::collection::Collection::read_capnp(collection_reader));
250 }
251
252 let samples_reader = reader.get_samples().unwrap();
254 let mut samples = Vec::new();
255 for sample_reader in samples_reader.iter() {
256 samples.push(crate::sample::Sample::read_capnp(sample_reader));
257 }
258
259 let sample_lineages_reader = reader.get_sample_lineages().unwrap();
260 let mut sample_lineages = Vec::new();
261 for sample_lineage_reader in sample_lineages_reader.iter() {
262 sample_lineages.push(SampleLineage::read_capnp(sample_lineage_reader));
263 }
264
265 let sequences_reader = reader.get_sequences().unwrap();
267 let mut sequences = Vec::new();
268 for sequence_reader in sequences_reader.iter() {
269 sequences.push(Sequence::read_capnp(sequence_reader));
270 }
271
272 let block_groups_reader = reader.get_block_groups().unwrap();
274 let mut block_groups = Vec::new();
275 for block_group_reader in block_groups_reader.iter() {
276 block_groups.push(BlockGroup::read_capnp(block_group_reader));
277 }
278
279 let nodes_reader = reader.get_nodes().unwrap();
281 let mut nodes = Vec::new();
282 for node_reader in nodes_reader.iter() {
283 nodes.push(Node::read_capnp(node_reader));
284 }
285
286 let edges_reader = reader.get_edges().unwrap();
288 let mut edges = Vec::new();
289 for edge_reader in edges_reader.iter() {
290 edges.push(Edge::read_capnp(edge_reader));
291 }
292
293 let block_group_edges_reader = reader.get_block_group_edges().unwrap();
295 let mut block_group_edges = Vec::new();
296 for block_group_edge_reader in block_group_edges_reader.iter() {
297 block_group_edges.push(crate::block_group_edge::BlockGroupEdge::read_capnp(
298 block_group_edge_reader,
299 ));
300 }
301
302 let paths_reader = reader.get_paths().unwrap();
304 let mut paths = Vec::new();
305 for path_reader in paths_reader.iter() {
306 paths.push(Path::read_capnp(path_reader));
307 }
308
309 let path_edges_reader = reader.get_path_edges().unwrap();
311 let mut path_edges = Vec::new();
312 for path_edge_reader in path_edges_reader.iter() {
313 path_edges.push(PathEdge::read_capnp(path_edge_reader));
314 }
315
316 let accessions_reader = reader.get_accessions().unwrap();
318 let mut accessions = Vec::new();
319 for accession_reader in accessions_reader.iter() {
320 accessions.push(Accession::read_capnp(accession_reader));
321 }
322
323 let accession_edges_reader = reader.get_accession_edges().unwrap();
325 let mut accession_edges = Vec::new();
326 for accession_edge_reader in accession_edges_reader.iter() {
327 accession_edges.push(AccessionEdge::read_capnp(accession_edge_reader));
328 }
329
330 let accession_paths_reader = reader.get_accession_paths().unwrap();
332 let mut accession_paths = Vec::new();
333 for accession_path_reader in accession_paths_reader.iter() {
334 accession_paths.push(AccessionPath::read_capnp(accession_path_reader));
335 }
336
337 let annotation_groups_reader = reader.get_annotation_groups().unwrap();
339 let mut annotation_groups = Vec::new();
340 for annotation_group_reader in annotation_groups_reader.iter() {
341 annotation_groups.push(AnnotationGroup::read_capnp(annotation_group_reader));
342 }
343
344 let annotations_reader = reader.get_annotations().unwrap();
346 let mut annotations = Vec::new();
347 for annotation_reader in annotations_reader.iter() {
348 annotations.push(Annotation::read_capnp(annotation_reader));
349 }
350
351 let annotation_group_samples_reader = reader.get_annotation_group_samples().unwrap();
353 let mut annotation_group_samples = Vec::new();
354 for annotation_group_sample_reader in annotation_group_samples_reader.iter() {
355 annotation_group_samples.push(AnnotationGroupSample::read_capnp(
356 annotation_group_sample_reader,
357 ));
358 }
359
360 ChangesetModels {
361 collections,
362 samples,
363 sample_lineages,
364 sequences,
365 block_groups,
366 nodes,
367 edges,
368 block_group_edges,
369 paths,
370 path_edges,
371 accessions,
372 accession_edges,
373 accession_paths,
374 annotation_groups,
375 annotations,
376 annotation_group_samples,
377 }
378 }
379}
380
381pub fn parse_string(item: &ChangesetItem, col: usize) -> String {
383 str::from_utf8(item.new_value(col).unwrap().as_bytes().unwrap())
384 .unwrap()
385 .to_string()
386}
387
388pub fn parse_maybe_string(item: &ChangesetItem, col: usize) -> Option<String> {
389 item.new_value(col)
390 .unwrap()
391 .as_bytes_or_null()
392 .unwrap()
393 .map(|v| str::from_utf8(v).unwrap().to_string())
394}
395
396pub fn parse_blob(item: &ChangesetItem, col: usize) -> [u8; 32] {
397 let bytes = item.new_value(col).unwrap().as_bytes().unwrap();
398
399 bytes.try_into().expect("blob must be exactly 32 bytes")
400}
401
402pub fn parse_maybe_blob(item: &ChangesetItem, col: usize) -> Option<[u8; 32]> {
403 item.new_value(col)
404 .unwrap()
405 .as_bytes_or_null()
406 .unwrap()
407 .map(|v| v.try_into().expect("blob must be exactly 32 bytes"))
408}
409
410pub fn parse_hashid(item: &ChangesetItem, col: usize) -> HashId {
411 HashId(parse_blob(item, col))
412}
413
414pub fn parse_maybe_hashid(item: &ChangesetItem, col: usize) -> Option<HashId> {
415 parse_maybe_blob(item, col).map(HashId)
416}
417
418pub fn parse_number(item: &ChangesetItem, col: usize) -> i64 {
419 item.new_value(col).unwrap().as_i64().unwrap()
420}
421
422pub fn parse_maybe_number(item: &ChangesetItem, col: usize) -> Option<i64> {
423 item.new_value(col).unwrap().as_i64_or_null().unwrap()
424}
425
426pub fn process_changesetiter(
427 conn: &GraphConnection,
428 mut changes: &[u8],
429) -> (ChangesetModels, DependencyModels) {
430 use fallible_streaming_iterator::FallibleStreamingIterator;
431 use gen_core::is_terminal;
432 use itertools::Itertools;
433
434 use crate::{
435 accession::{Accession, AccessionEdge},
436 block_group::BlockGroup,
437 edge::Edge,
438 node::Node,
439 path::Path,
440 sequence::Sequence,
441 traits::Query,
442 };
443
444 let mut created_block_groups = vec![];
446 let mut created_edges = vec![];
447 let mut created_nodes = vec![];
448 let mut created_sequences = vec![];
449 let mut created_bg_edges = vec![];
450 let mut created_samples = vec![];
451 let mut created_sample_lineages = vec![];
452 let mut created_collections = vec![];
453 let mut created_paths = vec![];
454 let mut created_path_edges = vec![];
455 let mut created_accessions = vec![];
456 let mut created_accession_edges = vec![];
457 let mut created_accession_paths = vec![];
458 let mut created_annotation_groups = vec![];
459 let mut created_annotations = vec![];
460 let mut created_annotation_group_samples = vec![];
461
462 let mut previous_collections = HashSet::new();
464 let mut previous_samples = HashSet::new();
465 let mut previous_block_groups = HashSet::new();
466 let mut previous_edges = HashSet::new();
467 let mut previous_paths = HashSet::new();
468 let mut previous_accessions = HashSet::new();
469 let mut previous_nodes = HashSet::new();
470 let mut previous_sequences = HashSet::new();
471 let mut previous_accession_edges = HashSet::new();
472 let mut created_block_groups_set = HashSet::new();
473 let mut created_paths_set = HashSet::new();
474 let mut created_accessions_set = HashSet::new();
475 let mut created_edges_set = HashSet::new();
476 let mut created_accession_edges_set = HashSet::new();
477 let mut created_nodes_set = HashSet::new();
478 let mut created_sequences_set = HashSet::new();
479 let mut created_samples_set: HashSet<String> = HashSet::new();
480 let mut created_collections_set: HashSet<String> = HashSet::new();
481
482 let input: &mut dyn Read = &mut changes;
483 let mut iter = ChangesetIter::start_strm(&input).unwrap();
484
485 while let Some(item) = iter.next().unwrap() {
486 let op = item.op().unwrap();
487 if !op.indirect() {
489 let table = op.table_name();
490 let pk_column = item
491 .pk()
492 .unwrap()
493 .iter()
494 .find_position(|item| **item == 1)
495 .unwrap()
496 .0;
497 match table {
498 "sequences" => {
499 let hash = parse_hashid(item, pk_column);
500 let sequence = Sequence::new()
501 .sequence_type(&parse_string(item, 1))
502 .sequence(&parse_string(item, 2))
503 .name(&parse_string(item, 3))
504 .file_path(&parse_string(item, 4))
505 .length(parse_number(item, 5))
506 .build();
507 assert_eq!(hash, sequence.hash);
508 created_sequences.push(sequence);
509 created_sequences_set.insert(hash);
510 }
511 "block_groups" => {
512 let bg_pk = HashId(parse_blob(item, pk_column));
513 let collection = parse_string(item, 1);
514 let sample_name = parse_string(item, 2);
515 let name = parse_string(item, 3);
516 let created_on = parse_number(item, 4);
517 let parent_block_group_id = parse_maybe_hashid(item, 5);
518
519 created_block_groups.push(BlockGroup {
520 id: bg_pk,
521 collection_name: collection.clone(),
522 sample_name: sample_name.clone(),
523 name,
524 created_on,
525 parent_block_group_id,
526 is_default: parse_number(item, 6) != 0,
527 });
528
529 created_block_groups_set.insert(bg_pk);
530 if let Some(parent_block_group_id) = parent_block_group_id
531 && !created_block_groups_set.contains(&parent_block_group_id)
532 {
533 previous_block_groups.insert(parent_block_group_id);
534 }
535 if !created_collections_set.contains(&collection) {
536 previous_collections.insert(collection);
537 }
538 if !created_samples_set.contains(&sample_name) {
539 previous_samples.insert(sample_name);
540 }
541 }
542 "nodes" => {
543 let node_id = parse_hashid(item, pk_column);
544 let sequence_hash = parse_hashid(item, 1);
545
546 created_nodes.push(Node {
547 id: node_id,
548 sequence_hash,
549 });
550
551 created_nodes_set.insert(node_id);
552 if !created_sequences_set.contains(&sequence_hash) {
553 previous_sequences.insert(sequence_hash);
554 }
555 }
556 "edges" => {
557 let edge_id = HashId(parse_blob(item, pk_column));
558 let source_node_id = parse_hashid(item, 1);
559 let target_node_id = parse_hashid(item, 4);
560
561 created_edges.push(Edge {
562 id: edge_id,
563 source_node_id,
564 source_coordinate: parse_number(item, 2),
565 source_strand: Strand::column_result(item.new_value(3).unwrap()).unwrap(),
566 target_node_id,
567 target_coordinate: parse_number(item, 5),
568 target_strand: Strand::column_result(item.new_value(6).unwrap()).unwrap(),
569 });
570
571 created_edges_set.insert(edge_id);
572 let nodes = Node::query_by_ids(conn, &[source_node_id, target_node_id]);
573 for node in nodes.iter() {
574 if !created_nodes_set.contains(&node.id) && !is_terminal(node.id) {
575 previous_sequences.insert(node.sequence_hash);
576 previous_nodes.insert(node.id);
577 }
578 }
579 }
580 "block_group_edges" => {
581 let bg_id = HashId(parse_blob(item, 1));
582 let edge_id = HashId(parse_blob(item, 2));
583
584 created_bg_edges.push(BlockGroupEdge {
585 id: HashId(parse_blob(item, pk_column)),
586 block_group_id: bg_id,
587 edge_id,
588 chromosome_index: parse_number(item, 3),
589 phased: parse_number(item, 4),
590 created_on: parse_number(item, 5),
591 });
592
593 if !created_edges_set.contains(&edge_id) {
594 previous_edges.insert(edge_id);
595 }
596 if !created_block_groups_set.contains(&bg_id) {
597 previous_block_groups.insert(bg_id);
598 }
599 }
600 "samples" => {
601 let name = parse_string(item, pk_column);
602 created_samples.push(Sample { name: name.clone() });
603 created_samples_set.insert(name);
604 }
605 "sample_lineage" => {
606 let parent_sample_name = parse_string(item, 0);
607 let child_sample_name = parse_string(item, 1);
608
609 created_sample_lineages.push(SampleLineage {
610 parent_sample_name: parent_sample_name.clone(),
611 child_sample_name: child_sample_name.clone(),
612 });
613
614 if !created_samples_set.contains(&parent_sample_name) {
615 previous_samples.insert(parent_sample_name);
616 }
617 if !created_samples_set.contains(&child_sample_name) {
618 previous_samples.insert(child_sample_name);
619 }
620 }
621 "collections" => {
622 let name = parse_string(item, pk_column);
623 created_collections.push(Collection { name: name.clone() });
624 created_collections_set.insert(name);
625 }
626 "paths" => {
627 let path_id = HashId(parse_blob(item, pk_column));
628 let bg_id = HashId(parse_blob(item, 1));
629
630 created_paths.push(Path {
631 id: path_id,
632 block_group_id: bg_id,
633 name: parse_string(item, 2),
634 created_on: parse_number(item, 3),
635 });
636
637 created_paths_set.insert(path_id);
638 if !created_block_groups_set.contains(&bg_id) {
639 previous_block_groups.insert(bg_id);
640 }
641 }
642 "path_edges" => {
643 let path_id = HashId(parse_blob(item, 1));
644 let edge_id = HashId(parse_blob(item, 2));
645
646 created_path_edges.push(PathEdge {
647 id: HashId(parse_blob(item, pk_column)),
648 path_id,
649 edge_id,
650 index_in_path: parse_number(item, 3),
651 });
652
653 if !created_paths_set.contains(&path_id) {
654 previous_paths.insert(path_id);
655 }
656 if !created_edges_set.contains(&edge_id) {
657 previous_edges.insert(edge_id);
658 }
659 }
660 "accessions" => {
661 let accession_id = HashId(parse_blob(item, pk_column));
662 let path_id = HashId(parse_blob(item, 2));
663 let parent_accession_id = parse_maybe_hashid(item, 3);
664
665 created_accessions.push(Accession {
666 id: accession_id,
667 name: parse_string(item, 1),
668 path_id,
669 parent_accession_id,
670 });
671
672 created_accessions_set.insert(accession_id);
673 if !created_paths_set.contains(&path_id) {
674 previous_paths.insert(path_id);
675 }
676 if let Some(id) = parent_accession_id
677 && !created_accessions_set.contains(&id)
678 {
679 previous_accessions.insert(id);
680 }
681 }
682 "accession_edges" => {
683 let edge_id = parse_hashid(item, pk_column);
684 let source_node_id = parse_hashid(item, 1);
685 let target_node_id = parse_hashid(item, 4);
686
687 created_accession_edges.push(AccessionEdge {
688 id: edge_id,
689 source_node_id,
690 source_coordinate: parse_number(item, 2),
691 source_strand: Strand::column_result(item.new_value(3).unwrap()).unwrap(),
692 target_node_id,
693 target_coordinate: parse_number(item, 5),
694 target_strand: Strand::column_result(item.new_value(6).unwrap()).unwrap(),
695 chromosome_index: parse_number(item, 7),
696 });
697
698 created_accession_edges_set.insert(edge_id);
699 let nodes = Node::query_by_ids(conn, &[source_node_id, target_node_id]);
700 if !created_nodes_set.contains(&source_node_id) {
701 previous_sequences.insert(nodes[0].sequence_hash);
702 }
703 if source_node_id != target_node_id
704 && !created_nodes_set.contains(&target_node_id)
705 {
706 previous_sequences.insert(nodes[1].sequence_hash);
707 }
708 }
709 "accession_paths" => {
710 let accession_id = parse_hashid(item, 1);
711 let edge_id = parse_hashid(item, 3);
712
713 created_accession_paths.push(AccessionPath {
714 id: parse_hashid(item, pk_column),
715 accession_id,
716 index_in_path: parse_number(item, 2),
717 edge_id,
718 });
719
720 if !created_accessions_set.contains(&accession_id) {
721 previous_accessions.insert(accession_id);
722 }
723 if !created_accession_edges_set.contains(&edge_id) {
724 previous_accession_edges.insert(edge_id);
725 }
726 }
727 "annotations" => {
728 let id = parse_hashid(item, pk_column);
729 let name = parse_string(item, 1);
730 let group = parse_string(item, 2);
731 let accession_id = parse_hashid(item, 3);
732
733 created_annotations.push(Annotation {
734 id,
735 name,
736 group,
737 accession_id,
738 });
739
740 if !created_accessions_set.contains(&accession_id) {
741 previous_accessions.insert(accession_id);
742 }
743 }
744 "annotation_groups" => {
745 let name = parse_string(item, pk_column);
746
747 created_annotation_groups.push(AnnotationGroup { name });
748 }
749 "annotation_group_samples" => {
750 let annotation_group = parse_string(item, 0);
751 let sample_name = parse_string(item, 1);
752
753 created_annotation_group_samples.push(AnnotationGroupSample {
754 annotation_group,
755 sample_name: sample_name.clone(),
756 });
757
758 if !created_samples_set.contains(&sample_name) {
759 previous_samples.insert(sample_name);
760 }
761 }
762 t => {
763 println!("unhandled table {t}")
764 }
765 }
766 }
767 }
768
769 let existing_edges = Edge::query_by_ids(
771 conn,
772 &previous_edges.clone().into_iter().collect::<Vec<_>>(),
773 );
774 let mut new_nodes = vec![];
775 for edge in existing_edges.iter() {
776 if !previous_nodes.contains(&edge.source_node_id) && !is_terminal(edge.source_node_id) {
777 previous_nodes.insert(edge.source_node_id);
778 new_nodes.push(edge.source_node_id);
779 }
780 if !previous_nodes.contains(&edge.target_node_id) && !is_terminal(edge.target_node_id) {
781 previous_nodes.insert(edge.target_node_id);
782 new_nodes.push(edge.target_node_id);
783 }
784 }
785 for node in Node::query_by_ids(conn, &new_nodes) {
786 previous_sequences.insert(node.sequence_hash);
787 }
788
789 let changeset_models = ChangesetModels {
790 sequences: created_sequences,
791 block_groups: created_block_groups,
792 nodes: created_nodes,
793 edges: created_edges,
794 block_group_edges: created_bg_edges,
795 samples: created_samples,
796 sample_lineages: created_sample_lineages,
797 collections: created_collections,
798 paths: created_paths,
799 path_edges: created_path_edges,
800 accessions: created_accessions,
801 accession_edges: created_accession_edges,
802 accession_paths: created_accession_paths,
803 annotation_groups: created_annotation_groups,
804 annotations: created_annotations,
805 annotation_group_samples: created_annotation_group_samples,
806 };
807
808 let dependency_models = DependencyModels {
809 collections: Collection::query_by_ids(conn, &previous_collections),
810 samples: Sample::query_by_ids(conn, &previous_samples),
811 sequences: Sequence::query_by_ids(conn, &previous_sequences),
812 block_group: BlockGroup::query_by_ids(conn, &previous_block_groups),
813 nodes: Node::query_by_ids(conn, &previous_nodes),
814 edges: Edge::query_by_ids(conn, &previous_edges),
815 paths: Path::query_by_ids(conn, &previous_paths),
816 accessions: Accession::query_by_ids(conn, &previous_accessions),
817 accession_edges: AccessionEdge::query_by_ids(conn, &previous_accession_edges),
818 };
819
820 (changeset_models, dependency_models)
821}
822
823pub fn apply_changeset(
824 conn: &GraphConnection,
825 changeset: &ChangesetModels,
826 dependencies: &DependencyModels,
827) -> Result<(), ChangesetError> {
828 for collection in dependencies.collections.iter() {
829 Collection::create(conn, &collection.name);
830 }
831
832 for sample in dependencies.samples.iter() {
833 Sample::get_or_create(conn, &sample.name);
834 }
835
836 for sequence in dependencies.sequences.iter() {
837 NewSequence::from(sequence).save(conn);
838 }
839 for node in dependencies.nodes.iter() {
840 if !is_terminal(node.id) {
841 assert!(Sequence::get_by_id(conn, &node.sequence_hash).is_some());
842 }
843 }
844
845 for bg in block_groups_parent_first(&dependencies.block_group) {
846 BlockGroup::create(
847 conn,
848 NewBlockGroup {
849 collection_name: &bg.collection_name,
850 sample_name: &bg.sample_name,
851 name: &bg.name,
852 parent_block_group_id: bg.parent_block_group_id.as_ref(),
853 is_default: bg.is_default,
854 },
855 );
856 }
857
858 for node in dependencies.nodes.iter() {
859 Node::create(conn, &node.sequence_hash, &node.id);
860 }
861
862 Edge::bulk_create(
863 conn,
864 &dependencies
865 .edges
866 .iter()
867 .map(EdgeData::from)
868 .collect::<Vec<_>>(),
869 );
870
871 for path in dependencies.paths.iter() {
872 Path::create(conn, &path.name, &path.block_group_id, &[]);
873 }
874
875 AccessionEdge::bulk_create(
876 conn,
877 &dependencies
878 .accession_edges
879 .iter()
880 .map(AccessionEdgeData::from)
881 .collect::<Vec<AccessionEdgeData>>(),
882 );
883
884 for accession in dependencies.accessions.iter() {
885 Accession::get_or_create(
886 conn,
887 &accession.name,
888 &accession.path_id,
889 accession.parent_accession_id.as_ref(),
890 );
891 }
892
893 for collection in &changeset.collections {
894 Collection::create(conn, &collection.name);
895 }
896 for sample in &changeset.samples {
897 Sample::get_or_create(conn, &sample.name);
898 }
899 for sample_lineage in &changeset.sample_lineages {
900 SampleLineage::create(
901 conn,
902 &sample_lineage.parent_sample_name,
903 &sample_lineage.child_sample_name,
904 )?;
905 }
906 for sequence in &changeset.sequences {
907 NewSequence::from(sequence).save(conn);
908 }
909 for bg in block_groups_parent_first(&changeset.block_groups) {
910 BlockGroup::create(
911 conn,
912 NewBlockGroup {
913 collection_name: &bg.collection_name,
914 sample_name: &bg.sample_name,
915 name: &bg.name,
916 parent_block_group_id: bg.parent_block_group_id.as_ref(),
917 is_default: bg.is_default,
918 },
919 );
920 }
921 for node in &changeset.nodes {
922 Node::create(conn, &node.sequence_hash, &node.id);
923 }
924
925 Edge::bulk_create(
926 conn,
927 &changeset
928 .edges
929 .iter()
930 .map(EdgeData::from)
931 .collect::<Vec<_>>(),
932 );
933 BlockGroupEdge::bulk_create(
934 conn,
935 &changeset
936 .block_group_edges
937 .iter()
938 .map(BlockGroupEdgeData::from)
939 .collect::<Vec<BlockGroupEdgeData>>(),
940 );
941
942 for path in &changeset.paths {
943 Path::create(conn, &path.name, &path.block_group_id, &[]);
944 let edges = changeset
945 .path_edges
946 .iter()
947 .filter(|edge| edge.path_id == path.id)
948 .sorted_by(|e1, e2| Ord::cmp(&e1.index_in_path, &e2.index_in_path))
949 .map(|path_edge| path_edge.edge_id);
950 PathEdge::bulk_create(conn, &path.id, &edges.collect::<Vec<_>>());
951 }
952
953 AccessionEdge::bulk_create(
954 conn,
955 &changeset
956 .accession_edges
957 .iter()
958 .map(AccessionEdgeData::from)
959 .collect::<Vec<_>>(),
960 );
961
962 for accession in &changeset.accessions {
963 Accession::get_or_create(
964 conn,
965 &accession.name,
966 &accession.path_id,
967 accession.parent_accession_id.as_ref(),
968 );
969 let edges = changeset
970 .accession_paths
971 .iter()
972 .filter(|ap| ap.accession_id == accession.id)
973 .sorted_by(|e1, e2| Ord::cmp(&e1.index_in_path, &e2.index_in_path))
974 .map(|ap| ap.edge_id);
975 AccessionPath::create(conn, &accession.id, &edges.collect::<Vec<_>>());
976 }
977
978 for annotation_group in &changeset.annotation_groups {
979 AnnotationGroup::get_or_create(conn, &annotation_group.name).map_err(|err| match err {
980 AnnotationGroupError::DatabaseError(inner) => inner,
981 })?;
982 }
983
984 for annotation in &changeset.annotations {
985 Annotation::get_or_create(
986 conn,
987 &annotation.name,
988 &annotation.group,
989 &annotation.accession_id,
990 )
991 .map_err(|err| match err {
992 AnnotationError::DatabaseError(inner) => inner,
993 AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
994 inner
995 }
996 })?;
997 }
998
999 for annotation_group_sample in &changeset.annotation_group_samples {
1000 AnnotationGroupSample::create(
1001 conn,
1002 &annotation_group_sample.annotation_group,
1003 &annotation_group_sample.sample_name,
1004 )
1005 .map_err(|err| match err {
1006 AnnotationError::DatabaseError(inner) => inner,
1007 AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
1008 inner
1009 }
1010 })?;
1011 }
1012 Ok(())
1013}
1014
1015pub fn revert_changeset(
1016 conn: &GraphConnection,
1017 changeset: &ChangesetModels,
1018) -> Result<(), ChangesetError> {
1019 for sample_lineage in &changeset.sample_lineages {
1020 SampleLineage::delete(
1021 conn,
1022 &sample_lineage.parent_sample_name,
1023 &sample_lineage.child_sample_name,
1024 )?;
1025 }
1026 for annotation_group_sample in &changeset.annotation_group_samples {
1027 AnnotationGroupSample::delete(
1028 conn,
1029 &annotation_group_sample.annotation_group,
1030 &annotation_group_sample.sample_name,
1031 )
1032 .map_err(|err| match err {
1033 AnnotationError::DatabaseError(inner) => inner,
1034 AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
1035 inner
1036 }
1037 })?;
1038 }
1039 Annotation::delete_by_ids(
1040 conn,
1041 &changeset
1042 .annotations
1043 .iter()
1044 .map(|obj| obj.id)
1045 .collect::<Vec<_>>(),
1046 );
1047 AnnotationGroup::delete_by_ids(
1048 conn,
1049 &changeset
1050 .annotation_groups
1051 .iter()
1052 .map(|group| group.name.clone())
1053 .collect::<Vec<_>>(),
1054 );
1055 AccessionPath::delete_by_ids(
1056 conn,
1057 &changeset
1058 .accession_paths
1059 .iter()
1060 .map(|obj| obj.id)
1061 .collect::<Vec<_>>(),
1062 );
1063 AccessionEdge::delete_by_ids(
1065 conn,
1066 &changeset
1067 .accession_edges
1068 .iter()
1069 .map(|pe| pe.id)
1070 .collect::<Vec<_>>(),
1071 );
1072 Accession::delete_by_ids(
1073 conn,
1074 &changeset
1075 .accessions
1076 .iter()
1077 .map(|obj| obj.id)
1078 .collect::<Vec<_>>(),
1079 );
1080 PathEdge::delete_by_ids(
1081 conn,
1082 &changeset
1083 .path_edges
1084 .iter()
1085 .map(|pe| pe.id)
1086 .collect::<Vec<_>>(),
1087 );
1088 Path::delete_by_ids(
1089 conn,
1090 &changeset.paths.iter().map(|p| p.id).collect::<Vec<_>>(),
1091 );
1092
1093 BlockGroupEdge::delete_by_ids(
1094 conn,
1095 &changeset
1096 .block_group_edges
1097 .iter()
1098 .map(|obj| obj.id)
1099 .collect::<Vec<_>>(),
1100 );
1101 Edge::delete_by_ids(
1102 conn,
1103 &changeset.edges.iter().map(|obj| obj.id).collect::<Vec<_>>(),
1104 );
1105
1106 BlockGroup::delete_by_ids(
1107 conn,
1108 &changeset
1109 .block_groups
1110 .iter()
1111 .map(|obj| obj.id)
1112 .collect::<Vec<_>>(),
1113 );
1114
1115 Node::delete_by_ids(
1116 conn,
1117 &changeset.nodes.iter().map(|obj| obj.id).collect::<Vec<_>>(),
1118 );
1119 Collection::delete_by_ids(
1120 conn,
1121 &changeset
1122 .collections
1123 .iter()
1124 .map(|obj| obj.name.clone())
1125 .collect::<Vec<_>>(),
1126 );
1127 Sample::delete_by_ids(
1128 conn,
1129 &changeset
1130 .samples
1131 .iter()
1132 .map(|obj| obj.name.clone())
1133 .collect::<Vec<_>>(),
1134 );
1135 Sequence::delete_by_ids(
1136 conn,
1137 &changeset
1138 .sequences
1139 .iter()
1140 .map(|obj| obj.hash)
1141 .collect::<Vec<_>>(),
1142 );
1143 Ok(())
1144}
1145
1146pub fn get_changeset_from_path(path: PathBuf) -> DatabaseChangeset {
1147 use capnp::serialize_packed;
1148 let file = fs::File::open(path).unwrap();
1149 let mut reader = std::io::BufReader::new(file);
1150
1151 let message_reader =
1152 serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new()).unwrap();
1153 let root = message_reader
1154 .get_root::<database_changeset::Reader>()
1155 .unwrap();
1156 DatabaseChangeset::read_capnp(root)
1157}
1158
1159pub fn get_changeset_dependencies_from_path(path: PathBuf) -> DependencyModels {
1160 use capnp::serialize_packed;
1161
1162 let file = fs::File::open(path).unwrap();
1163 let mut reader = std::io::BufReader::new(file);
1164 let message_reader =
1165 serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new()).unwrap();
1166 let root = message_reader
1167 .get_root::<crate::gen_models_capnp::dependency_models::Reader>()
1168 .unwrap();
1169 DependencyModels::read_capnp(root)
1170}
1171
1172fn block_groups_parent_first(block_groups: &[BlockGroup]) -> Vec<&BlockGroup> {
1174 let by_id = block_groups
1175 .iter()
1176 .map(|block_group| (block_group.id, block_group))
1177 .collect::<HashMap<_, _>>();
1178
1179 fn walk<'a>(
1180 block_group_id: HashId,
1181 by_id: &HashMap<HashId, &'a BlockGroup>,
1182 visiting: &mut HashSet<HashId>,
1183 visited: &mut HashSet<HashId>,
1184 ordered: &mut Vec<&'a BlockGroup>,
1185 ) {
1186 if visited.contains(&block_group_id) || !visiting.insert(block_group_id) {
1187 return;
1188 }
1189
1190 let block_group = by_id[&block_group_id];
1191 if let Some(parent_id) = block_group.parent_block_group_id
1192 && by_id.contains_key(&parent_id)
1193 {
1194 walk(parent_id, by_id, visiting, visited, ordered);
1195 }
1196
1197 visiting.remove(&block_group_id);
1198 if visited.insert(block_group_id) {
1199 ordered.push(block_group);
1200 }
1201 }
1202
1203 let mut ordered = Vec::new();
1204 let mut visiting = HashSet::new();
1205 let mut visited = HashSet::new();
1206 for block_group in block_groups {
1207 walk(
1208 block_group.id,
1209 &by_id,
1210 &mut visiting,
1211 &mut visited,
1212 &mut ordered,
1213 );
1214 }
1215
1216 ordered
1217}
1218
1219pub fn write_changeset(
1220 workspace: &Workspace,
1221 operation: &Operation,
1222 changes: DatabaseChangeset,
1223 dependencies: &DependencyModels,
1224) {
1225 use capnp::{message::Builder, serialize_packed};
1226
1227 let change_path = operation.get_changeset_path(workspace);
1228 let dependency_path = operation.get_changeset_dependencies_path(workspace);
1229
1230 let mut dependency_file = fs::File::create_new(&dependency_path)
1232 .unwrap_or_else(|_| panic!("Unable to open {dependency_path:?}"));
1233 let mut message = Builder::new_default();
1234 let mut dep_root = message.init_root();
1235 dependencies.write_capnp(&mut dep_root);
1236 serialize_packed::write_message(&mut dependency_file, &message).unwrap();
1237
1238 let mut file = fs::File::create_new(&change_path)
1240 .unwrap_or_else(|_| panic!("Unable to open {change_path:?}"));
1241 let mut message = Builder::new_default();
1242 let mut change_root = message.init_root();
1243 changes.write_capnp(&mut change_root);
1244 serialize_packed::write_message(&mut file, &message).unwrap();
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249 use chrono::Utc;
1250
1251 use super::*;
1252 use crate::{
1253 file_types::FileTypes,
1254 operations::{OperationFile, OperationInfo},
1255 session_operations::{end_operation, start_operation},
1256 test_helpers::{setup_block_group, setup_gen},
1257 traits::Query,
1258 };
1259
1260 #[test]
1261 fn test_database_changeset_capnp_serialization() {
1262 use capnp::message::TypedBuilder;
1263 use gen_core::Strand;
1264
1265 let changeset_models = ChangesetModels {
1266 collections: vec![crate::collection::Collection {
1267 name: "test_collection".to_string(),
1268 }],
1269 samples: vec![crate::sample::Sample {
1270 name: "test_sample".to_string(),
1271 }],
1272 sample_lineages: vec![SampleLineage {
1273 parent_sample_name: "parent_sample".to_string(),
1274 child_sample_name: "test_sample".to_string(),
1275 }],
1276 sequences: vec![
1277 NewSequence::new()
1278 .sequence("ATCG")
1279 .sequence_type("DNA")
1280 .name("test_seq")
1281 .build(),
1282 ],
1283 block_groups: vec![BlockGroup {
1284 id: HashId::pad_str(1),
1285 collection_name: "test_collection".to_string(),
1286 sample_name: "test_sample".to_string(),
1287 name: "test_bg".to_string(),
1288 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1289 parent_block_group_id: None,
1290 is_default: false,
1291 }],
1292 nodes: vec![
1293 Node {
1294 id: HashId::convert_str("node_hash"),
1295 sequence_hash: HashId::convert_str("test_hash"),
1296 },
1297 Node {
1298 id: HashId::convert_str("node_hash_2"),
1299 sequence_hash: HashId::convert_str("test_hash_2"),
1300 },
1301 ],
1302 edges: vec![Edge {
1303 id: HashId::pad_str(1),
1304 source_node_id: HashId::convert_str("1"),
1305 source_coordinate: 0,
1306 source_strand: Strand::Forward,
1307 target_node_id: HashId::convert_str("2"),
1308 target_coordinate: 0,
1309 target_strand: Strand::Forward,
1310 }],
1311 block_group_edges: vec![crate::block_group_edge::BlockGroupEdge {
1312 id: HashId::pad_str(1),
1313 block_group_id: HashId::pad_str(1),
1314 edge_id: HashId::pad_str(1),
1315 chromosome_index: 0,
1316 phased: 0,
1317 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1318 }],
1319 paths: vec![Path {
1320 id: HashId::pad_str(1),
1321 block_group_id: HashId::pad_str(1),
1322 name: "test_path".to_string(),
1323 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1324 }],
1325 path_edges: vec![PathEdge {
1326 id: HashId::pad_str(1),
1327 path_id: HashId::pad_str(1),
1328 index_in_path: 0,
1329 edge_id: HashId::pad_str(1),
1330 }],
1331 accessions: vec![Accession {
1332 id: HashId::pad_str(1),
1333 name: "test_accession".to_string(),
1334 path_id: HashId::pad_str(1),
1335 parent_accession_id: None,
1336 }],
1337 accession_edges: vec![AccessionEdge {
1338 id: HashId::pad_str(1),
1339 source_node_id: HashId::convert_str("1"),
1340 source_coordinate: 0,
1341 source_strand: Strand::Forward,
1342 target_node_id: HashId::convert_str("2"),
1343 target_coordinate: 0,
1344 target_strand: Strand::Forward,
1345 chromosome_index: 0,
1346 }],
1347 accession_paths: vec![AccessionPath {
1348 id: HashId::pad_str(1),
1349 accession_id: HashId::pad_str(1),
1350 index_in_path: 0,
1351 edge_id: HashId::pad_str(1),
1352 }],
1353 annotation_groups: vec![AnnotationGroup {
1354 name: "gff3".to_string(),
1355 }],
1356 annotations: vec![Annotation {
1357 id: HashId::pad_str(1),
1358 name: "test_annotation".to_string(),
1359 group: "gff3".to_string(),
1360 accession_id: HashId::pad_str(1),
1361 }],
1362 annotation_group_samples: vec![AnnotationGroupSample {
1363 annotation_group: "gff3".to_string(),
1364 sample_name: "test_sample".to_string(),
1365 }],
1366 };
1367
1368 let changeset = DatabaseChangeset {
1369 db_path: "/path/to/db".to_string(),
1370 changes: changeset_models,
1371 };
1372
1373 let mut message = TypedBuilder::<database_changeset::Owned>::new_default();
1374 let mut root = message.init_root();
1375 changeset.write_capnp(&mut root);
1376
1377 let deserialized = DatabaseChangeset::read_capnp(root.into_reader());
1378 assert_eq!(changeset, deserialized);
1379 }
1380
1381 #[test]
1382 fn test_database_changeset_get_db_path() {
1383 use std::io::Write;
1384
1385 use capnp::{message::Builder, serialize_packed};
1386 use tempfile::NamedTempFile;
1387
1388 let expected_db_path = "/tmp/test_db_path";
1389 let changeset = DatabaseChangeset {
1390 db_path: expected_db_path.to_string(),
1391 changes: ChangesetModels {
1392 collections: vec![],
1393 samples: vec![],
1394 sample_lineages: vec![],
1395 sequences: vec![],
1396 block_groups: vec![],
1397 nodes: vec![],
1398 edges: vec![],
1399 block_group_edges: vec![],
1400 paths: vec![],
1401 path_edges: vec![],
1402 accessions: vec![],
1403 accession_edges: vec![],
1404 accession_paths: vec![],
1405 annotation_groups: vec![],
1406 annotations: vec![],
1407 annotation_group_samples: vec![],
1408 },
1409 };
1410
1411 let mut temp_file = NamedTempFile::new().unwrap();
1412 let path = temp_file.path().to_path_buf();
1413
1414 let mut message = Builder::new_default();
1415 let mut root = message.init_root();
1416 changeset.write_capnp(&mut root);
1417 serialize_packed::write_message(&mut temp_file, &message).unwrap();
1418 temp_file.flush().unwrap();
1419
1420 let db_path = DatabaseChangeset::get_db_path(path.as_path());
1421
1422 assert_eq!(db_path, expected_db_path);
1423 }
1424
1425 #[test]
1426 fn test_changeset_models_capnp_serialization() {
1427 use capnp::message::TypedBuilder;
1428 use gen_core::Strand;
1429
1430 let changeset_models = ChangesetModels {
1431 collections: vec![crate::collection::Collection {
1432 name: "test_collection".to_string(),
1433 }],
1434 samples: vec![crate::sample::Sample {
1435 name: "test_sample".to_string(),
1436 }],
1437 sample_lineages: vec![SampleLineage {
1438 parent_sample_name: "parent_sample".to_string(),
1439 child_sample_name: "test_sample".to_string(),
1440 }],
1441 sequences: vec![
1442 NewSequence::new()
1443 .sequence("ATCG")
1444 .sequence_type("DNA")
1445 .name("test_seq")
1446 .build(),
1447 ],
1448 block_groups: vec![BlockGroup {
1449 id: HashId::pad_str(1),
1450 collection_name: "test_collection".to_string(),
1451 sample_name: "test_sample".to_string(),
1452 name: "test_bg".to_string(),
1453 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1454 parent_block_group_id: None,
1455 is_default: false,
1456 }],
1457 nodes: vec![
1458 Node {
1459 id: HashId::convert_str("node_hash"),
1460 sequence_hash: HashId::convert_str("test_hash"),
1461 },
1462 Node {
1463 id: HashId::convert_str("node_hash_2"),
1464 sequence_hash: HashId::convert_str("test_hash_2"),
1465 },
1466 ],
1467 edges: vec![Edge {
1468 id: HashId::pad_str(1),
1469 source_node_id: HashId::convert_str("1"),
1470 source_coordinate: 0,
1471 source_strand: Strand::Forward,
1472 target_node_id: HashId::convert_str("2"),
1473 target_coordinate: 0,
1474 target_strand: Strand::Forward,
1475 }],
1476 block_group_edges: vec![crate::block_group_edge::BlockGroupEdge {
1477 id: HashId::pad_str(1),
1478 block_group_id: HashId::pad_str(1),
1479 edge_id: HashId::pad_str(1),
1480 chromosome_index: 0,
1481 phased: 0,
1482 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1483 }],
1484 paths: vec![Path {
1485 id: HashId::pad_str(1),
1486 block_group_id: HashId::pad_str(1),
1487 name: "test_path".to_string(),
1488 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1489 }],
1490 path_edges: vec![PathEdge {
1491 id: HashId::pad_str(1),
1492 path_id: HashId::pad_str(1),
1493 index_in_path: 0,
1494 edge_id: HashId::pad_str(1),
1495 }],
1496 accessions: vec![Accession {
1497 id: HashId::pad_str(1),
1498 name: "test_accession".to_string(),
1499 path_id: HashId::pad_str(1),
1500 parent_accession_id: None,
1501 }],
1502 accession_edges: vec![AccessionEdge {
1503 id: HashId::pad_str(1),
1504 source_node_id: HashId::convert_str("1"),
1505 source_coordinate: 0,
1506 source_strand: Strand::Forward,
1507 target_node_id: HashId::convert_str("2"),
1508 target_coordinate: 0,
1509 target_strand: Strand::Forward,
1510 chromosome_index: 0,
1511 }],
1512 accession_paths: vec![AccessionPath {
1513 id: HashId::pad_str(1),
1514 accession_id: HashId::pad_str(1),
1515 index_in_path: 0,
1516 edge_id: HashId::pad_str(1),
1517 }],
1518 annotation_groups: vec![AnnotationGroup {
1519 name: "gff3".to_string(),
1520 }],
1521 annotations: vec![Annotation {
1522 id: HashId::pad_str(1),
1523 name: "test_annotation".to_string(),
1524 group: "gff3".to_string(),
1525 accession_id: HashId::pad_str(1),
1526 }],
1527 annotation_group_samples: vec![AnnotationGroupSample {
1528 annotation_group: "gff3".to_string(),
1529 sample_name: "test_sample".to_string(),
1530 }],
1531 };
1532
1533 let mut message = TypedBuilder::<changeset_models::Owned>::new_default();
1534 let mut root = message.init_root();
1535 changeset_models.write_capnp(&mut root);
1536
1537 let deserialized = ChangesetModels::read_capnp(root.into_reader());
1538
1539 assert_eq!(changeset_models, deserialized);
1540 }
1541
1542 #[test]
1543 fn test_changeset_includes_annotations() {
1544 use crate::block_group::PathCache;
1545
1546 let context = setup_gen();
1547 let conn = context.graph().conn();
1548 let op_conn = context.operations().conn();
1549
1550 let db_uuid = crate::metadata::get_db_uuid(conn);
1551 crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path").unwrap();
1552
1553 let _ = Sample::create(conn, "sample-1").unwrap();
1554 let (block_group_id, path) = setup_block_group(conn);
1555 let mut cache = PathCache::new(conn);
1556 let _ = PathCache::lookup(&mut cache, &block_group_id, path.name.clone());
1557 let accession = BlockGroup::add_accession(conn, &path, "ann-accession", 0, 5, &mut cache);
1558
1559 let mut session = start_operation(conn);
1560 let annotation = Annotation::get_or_create(conn, "gene-a", "gff3", &accession.id).unwrap();
1561 annotation.add_samples(conn, &["sample-1"]).unwrap();
1562
1563 let operation = end_operation(
1564 &context,
1565 &mut session,
1566 &OperationInfo {
1567 files: vec![],
1568 description: "annotation op".to_string(),
1569 },
1570 "annotation op",
1571 None,
1572 )
1573 .unwrap();
1574
1575 let changeset = operation.get_changeset(context.workspace());
1576 assert_eq!(
1577 changeset.changes.annotation_groups,
1578 vec![AnnotationGroup {
1579 name: "gff3".to_string(),
1580 }]
1581 );
1582 assert_eq!(changeset.changes.annotations, vec![annotation.clone()]);
1583 assert_eq!(
1584 changeset.changes.annotation_group_samples,
1585 vec![AnnotationGroupSample {
1586 annotation_group: annotation.group.clone(),
1587 sample_name: "sample-1".to_string(),
1588 }]
1589 );
1590
1591 let dependencies = operation.get_changeset_dependencies(context.workspace());
1592 assert_eq!(dependencies.samples.len(), 1);
1593 assert_eq!(dependencies.samples[0].name, "sample-1");
1594 assert_eq!(dependencies.accessions.len(), 1);
1595 assert_eq!(dependencies.accessions[0].id, accession.id);
1596 }
1597
1598 #[test]
1599 fn test_changeset_includes_sample_lineage() {
1600 let context = setup_gen();
1601 let conn = context.graph().conn();
1602 let op_conn = context.operations().conn();
1603
1604 let db_uuid = crate::metadata::get_db_uuid(conn);
1605 crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path").unwrap();
1606
1607 let _ = Sample::create(conn, "parent").unwrap();
1608
1609 let mut session = start_operation(conn);
1610 let _ = Sample::create(conn, "child").unwrap();
1611 SampleLineage::create(conn, "parent", "child").unwrap();
1612
1613 let operation = end_operation(
1614 &context,
1615 &mut session,
1616 &OperationInfo {
1617 files: vec![],
1618 description: "sample lineage op".to_string(),
1619 },
1620 "sample lineage op",
1621 None,
1622 )
1623 .unwrap();
1624
1625 let changeset = operation.get_changeset(context.workspace());
1626 assert_eq!(
1627 changeset.changes.sample_lineages,
1628 vec![SampleLineage {
1629 parent_sample_name: "parent".to_string(),
1630 child_sample_name: "child".to_string(),
1631 }]
1632 );
1633
1634 let dependencies = operation.get_changeset_dependencies(context.workspace());
1635 assert_eq!(dependencies.samples.len(), 1);
1636 assert_eq!(dependencies.samples[0].name, "parent");
1637 }
1638
1639 #[test]
1640 fn test_block_groups_parent_first() {
1641 let parent = BlockGroup {
1642 id: HashId::pad_str(1),
1643 collection_name: "test".to_string(),
1644 sample_name: "parent".to_string(),
1645 name: "bg".to_string(),
1646 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1647 parent_block_group_id: None,
1648 is_default: false,
1649 };
1650 let child = BlockGroup {
1651 id: HashId::pad_str(2),
1652 collection_name: "test".to_string(),
1653 sample_name: "child".to_string(),
1654 name: "bg".to_string(),
1655 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1656 parent_block_group_id: Some(parent.id),
1657 is_default: false,
1658 };
1659
1660 let block_groups = [child.clone(), parent.clone()];
1661 let ordered = block_groups_parent_first(&block_groups);
1662 assert_eq!(ordered, vec![&parent, &child]);
1663 }
1664
1665 #[test]
1666 fn test_block_groups_parent_first_with_three_level_lineage() {
1667 let parent = BlockGroup {
1668 id: HashId::pad_str(1),
1669 collection_name: "test".to_string(),
1670 sample_name: "parent".to_string(),
1671 name: "bg".to_string(),
1672 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1673 parent_block_group_id: None,
1674 is_default: false,
1675 };
1676 let child = BlockGroup {
1677 id: HashId::pad_str(2),
1678 collection_name: "test".to_string(),
1679 sample_name: "child".to_string(),
1680 name: "bg".to_string(),
1681 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1682 parent_block_group_id: Some(parent.id),
1683 is_default: false,
1684 };
1685 let grandchild = BlockGroup {
1686 id: HashId::pad_str(3),
1687 collection_name: "test".to_string(),
1688 sample_name: "grandchild".to_string(),
1689 name: "bg".to_string(),
1690 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1691 parent_block_group_id: Some(child.id),
1692 is_default: false,
1693 };
1694
1695 let block_groups = [grandchild.clone(), child.clone(), parent.clone()];
1696 let ordered = block_groups_parent_first(&block_groups);
1697 assert_eq!(ordered, vec![&parent, &child, &grandchild]);
1698 }
1699
1700 #[test]
1701 fn test_block_groups_parent_first_when_every_entry_has_a_parent() {
1702 let root_id = HashId::pad_str(1);
1703 let child = BlockGroup {
1704 id: HashId::pad_str(2),
1705 collection_name: "test".to_string(),
1706 sample_name: "child".to_string(),
1707 name: "bg".to_string(),
1708 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1709 parent_block_group_id: Some(root_id),
1710 is_default: false,
1711 };
1712 let grandchild = BlockGroup {
1713 id: HashId::pad_str(3),
1714 collection_name: "test".to_string(),
1715 sample_name: "grandchild".to_string(),
1716 name: "bg".to_string(),
1717 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1718 parent_block_group_id: Some(child.id),
1719 is_default: false,
1720 };
1721
1722 let block_groups = [grandchild.clone(), child.clone()];
1723 let ordered = block_groups_parent_first(&block_groups);
1724 assert_eq!(ordered, vec![&child, &grandchild]);
1725 }
1726
1727 #[cfg(test)]
1728 mod changeset_dependencies {
1729 use super::*;
1730
1731 #[test]
1732 fn test_tracks_nodes_and_sequences_from_previous_block_group_edges() {
1733 let context = setup_gen();
1734 let conn = context.graph().conn();
1735 let op_conn = context.operations().conn();
1736
1737 let db_uuid = crate::metadata::get_db_uuid(conn);
1738 crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path")
1739 .unwrap();
1740
1741 let (bg_id, _path_id) = setup_block_group(conn);
1742 let old_edges = BlockGroupEdge::edges_for_block_group(conn, &bg_id);
1743
1744 let mut session = start_operation(conn);
1745 let _ = Sample::create(conn, "new").unwrap();
1747 let new_bg = BlockGroup::create(
1748 conn,
1749 NewBlockGroup {
1750 collection_name: "test",
1751 sample_name: "new",
1752 name: "new-bg",
1753 ..Default::default()
1754 },
1755 );
1756 let shared_edge = old_edges[0].edge.clone();
1757 BlockGroupEdge::bulk_create(
1758 conn,
1759 &[BlockGroupEdgeData {
1760 block_group_id: new_bg.id,
1761 edge_id: shared_edge.id,
1762 chromosome_index: 0,
1763 phased: 0,
1764 }],
1765 );
1766 let operation = end_operation(
1767 &context,
1768 &mut session,
1769 &OperationInfo {
1770 files: vec![],
1771 description: "test".to_string(),
1772 },
1773 "test",
1774 None,
1775 )
1776 .unwrap();
1777
1778 let dependencies = operation.get_changeset_dependencies(context.workspace());
1779 assert_eq!(dependencies.nodes[0].id, shared_edge.target_node_id);
1780 assert_eq!(dependencies.nodes.len(), 1);
1781 let nodes = Node::query_by_ids(conn, &[shared_edge.target_node_id]);
1782 assert_eq!(dependencies.sequences[0].hash, nodes[0].sequence_hash);
1783 assert_eq!(dependencies.sequences.len(), 1);
1784 }
1785
1786 #[test]
1787 fn test_records_patch_dependencies() {
1788 let context = setup_gen();
1789 let conn = context.graph().conn();
1790 let op_conn = context.operations().conn();
1791
1792 let db_uuid = crate::metadata::get_db_uuid(conn);
1793 crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path")
1794 .unwrap();
1795
1796 let (bg_id, _path_id) = setup_block_group(conn);
1798 let dep_bg = BlockGroup::get_by_id(conn, &bg_id);
1799
1800 let existing_seq = Sequence::new()
1801 .sequence_type("DNA")
1802 .sequence("AAAATTTT")
1803 .save(conn);
1804 let existing_node_id =
1805 Node::create(conn, &existing_seq.hash, &HashId::convert_str("1"));
1806
1807 let mut session = start_operation(conn);
1808
1809 let random_seq = Sequence::new()
1810 .sequence_type("DNA")
1811 .sequence("ATCG")
1812 .save(conn);
1813 let random_node_id = Node::create(conn, &random_seq.hash, &HashId::convert_str("2"));
1814
1815 let new_edge = Edge::create(
1816 conn,
1817 random_node_id,
1818 0,
1819 Strand::Forward,
1820 existing_node_id,
1821 0,
1822 Strand::Forward,
1823 );
1824 let block_group_edge = BlockGroupEdgeData {
1825 block_group_id: bg_id,
1826 edge_id: new_edge.id,
1827 chromosome_index: 0,
1828 phased: 0,
1829 };
1830 BlockGroupEdge::bulk_create(conn, &[block_group_edge]);
1831 let operation = end_operation(
1832 &context,
1833 &mut session,
1834 &OperationInfo {
1835 files: vec![OperationFile {
1836 file_path: "test".to_string(),
1837 file_type: FileTypes::None,
1838 }],
1839 description: "test".to_string(),
1840 },
1841 "test",
1842 None,
1843 )
1844 .unwrap();
1845
1846 let dependencies = operation.get_changeset_dependencies(context.workspace());
1847 assert_eq!(dependencies.sequences.len(), 1);
1848 assert_eq!(
1849 dependencies.block_group[0].collection_name,
1850 dep_bg.collection_name
1851 );
1852 assert_eq!(dependencies.block_group[0].name, dep_bg.name);
1853 assert_eq!(dependencies.block_group[0].sample_name, dep_bg.sample_name);
1854 }
1855 }
1856}