1use std::{
2 collections::HashSet,
3 convert::TryInto,
4 fs,
5 io::Read,
6 path::{Path as StdPath, PathBuf},
7 str,
8};
9
10use gen_core::{HashId, Strand, config::Workspace, is_terminal, traits::Capnp};
11use itertools::Itertools;
12use rusqlite::{
13 session::{ChangesetItem, ChangesetIter},
14 types::FromSql,
15};
16use serde::{Deserialize, Serialize};
17
18use crate::{
19 accession::{Accession, AccessionEdge, AccessionEdgeData, AccessionPath},
20 annotations::{
21 Annotation, AnnotationError, AnnotationGroup, AnnotationGroupError, AnnotationGroupSample,
22 },
23 block_group::BlockGroup,
24 block_group_edge::{BlockGroupEdge, BlockGroupEdgeData},
25 collection::Collection,
26 db::GraphConnection,
27 edge::{Edge, EdgeData},
28 errors::ChangesetError,
29 gen_models_capnp::{changeset_models, database_changeset},
30 node::Node,
31 operations::Operation,
32 path::Path,
33 path_edge::PathEdge,
34 sample::Sample,
35 sequence::{NewSequence, Sequence},
36 session_operations::DependencyModels,
37 traits::Query,
38};
39
40#[derive(Debug, Deserialize, Serialize, PartialEq)]
41pub struct DatabaseChangeset {
42 pub db_path: String,
43 pub changes: ChangesetModels,
44}
45
46impl DatabaseChangeset {
47 pub fn get_db_path(path: &StdPath) -> String {
48 use capnp::serialize_packed;
49
50 let file = fs::File::open(path).unwrap();
51 let mut reader = std::io::BufReader::new(file);
52
53 let message_reader =
54 serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new())
55 .unwrap();
56 let root = message_reader
57 .get_root::<database_changeset::Reader>()
58 .unwrap();
59 root.get_db_path().unwrap().to_string().unwrap()
60 }
61}
62
63impl<'a> Capnp<'a> for DatabaseChangeset {
64 type Builder = database_changeset::Builder<'a>;
65 type Reader = database_changeset::Reader<'a>;
66
67 fn write_capnp(&self, builder: &mut Self::Builder) {
68 builder.set_db_path(&self.db_path);
69 let mut changeset_builder = builder.reborrow().init_changes();
70 self.changes.write_capnp(&mut changeset_builder);
71 }
72
73 fn read_capnp(reader: Self::Reader) -> Self {
74 let db_path = reader.get_db_path().unwrap().to_string().unwrap();
75 let changeset_reader = reader.get_changes().unwrap();
76 let changes = ChangesetModels::read_capnp(changeset_reader);
77 DatabaseChangeset { db_path, changes }
78 }
79}
80
81#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
82pub struct ChangesetModels {
83 pub collections: Vec<crate::collection::Collection>,
84 pub samples: Vec<crate::sample::Sample>,
85 pub sequences: Vec<Sequence>,
86 pub block_groups: Vec<BlockGroup>,
87 pub nodes: Vec<Node>,
88 pub edges: Vec<Edge>,
89 pub block_group_edges: Vec<crate::block_group_edge::BlockGroupEdge>,
90 pub paths: Vec<Path>,
91 pub path_edges: Vec<PathEdge>,
92 pub accessions: Vec<Accession>,
93 pub accession_edges: Vec<AccessionEdge>,
94 pub accession_paths: Vec<AccessionPath>,
95 pub annotation_groups: Vec<AnnotationGroup>,
96 pub annotations: Vec<Annotation>,
97 pub annotation_group_samples: Vec<AnnotationGroupSample>,
98}
99
100impl<'a> Capnp<'a> for ChangesetModels {
101 type Builder = changeset_models::Builder<'a>;
102 type Reader = changeset_models::Reader<'a>;
103
104 fn write_capnp(&self, builder: &mut Self::Builder) {
105 let mut collections_builder = builder
107 .reborrow()
108 .init_collections(self.collections.len() as u32);
109 for (i, collection) in self.collections.iter().enumerate() {
110 let mut collection_builder = collections_builder.reborrow().get(i as u32);
111 collection.write_capnp(&mut collection_builder);
112 }
113
114 let mut samples_builder = builder.reborrow().init_samples(self.samples.len() as u32);
116 for (i, sample) in self.samples.iter().enumerate() {
117 let mut sample_builder = samples_builder.reborrow().get(i as u32);
118 sample.write_capnp(&mut sample_builder);
119 }
120
121 let mut sequences_builder = builder
123 .reborrow()
124 .init_sequences(self.sequences.len() as u32);
125 for (i, sequence) in self.sequences.iter().enumerate() {
126 let mut sequence_builder = sequences_builder.reborrow().get(i as u32);
127 sequence.write_capnp(&mut sequence_builder);
128 }
129
130 let mut block_groups_builder = builder
132 .reborrow()
133 .init_block_groups(self.block_groups.len() as u32);
134 for (i, block_group) in self.block_groups.iter().enumerate() {
135 let mut block_group_builder = block_groups_builder.reborrow().get(i as u32);
136 block_group.write_capnp(&mut block_group_builder);
137 }
138
139 let mut nodes_builder = builder.reborrow().init_nodes(self.nodes.len() as u32);
141 for (i, node) in self.nodes.iter().enumerate() {
142 let mut node_builder = nodes_builder.reborrow().get(i as u32);
143 node.write_capnp(&mut node_builder);
144 }
145
146 let mut edges_builder = builder.reborrow().init_edges(self.edges.len() as u32);
148 for (i, edge) in self.edges.iter().enumerate() {
149 let mut edge_builder = edges_builder.reborrow().get(i as u32);
150 edge.write_capnp(&mut edge_builder);
151 }
152
153 let mut block_group_edges_builder = builder
155 .reborrow()
156 .init_block_group_edges(self.block_group_edges.len() as u32);
157 for (i, block_group_edge) in self.block_group_edges.iter().enumerate() {
158 let mut block_group_edge_builder = block_group_edges_builder.reborrow().get(i as u32);
159 block_group_edge.write_capnp(&mut block_group_edge_builder);
160 }
161
162 let mut paths_builder = builder.reborrow().init_paths(self.paths.len() as u32);
164 for (i, path) in self.paths.iter().enumerate() {
165 let mut path_builder = paths_builder.reborrow().get(i as u32);
166 path.write_capnp(&mut path_builder);
167 }
168
169 let mut path_edges_builder = builder
171 .reborrow()
172 .init_path_edges(self.path_edges.len() as u32);
173 for (i, path_edge) in self.path_edges.iter().enumerate() {
174 let mut path_edge_builder = path_edges_builder.reborrow().get(i as u32);
175 path_edge.write_capnp(&mut path_edge_builder);
176 }
177
178 let mut accessions_builder = builder
180 .reborrow()
181 .init_accessions(self.accessions.len() as u32);
182 for (i, accession) in self.accessions.iter().enumerate() {
183 let mut accession_builder = accessions_builder.reborrow().get(i as u32);
184 accession.write_capnp(&mut accession_builder);
185 }
186
187 let mut accession_edges_builder = builder
189 .reborrow()
190 .init_accession_edges(self.accession_edges.len() as u32);
191 for (i, accession_edge) in self.accession_edges.iter().enumerate() {
192 let mut accession_edge_builder = accession_edges_builder.reborrow().get(i as u32);
193 accession_edge.write_capnp(&mut accession_edge_builder);
194 }
195
196 let mut accession_paths_builder = builder
198 .reborrow()
199 .init_accession_paths(self.accession_paths.len() as u32);
200 for (i, accession_path) in self.accession_paths.iter().enumerate() {
201 let mut accession_path_builder = accession_paths_builder.reborrow().get(i as u32);
202 accession_path.write_capnp(&mut accession_path_builder);
203 }
204
205 let mut annotation_groups_builder = builder
207 .reborrow()
208 .init_annotation_groups(self.annotation_groups.len() as u32);
209 for (i, annotation_group) in self.annotation_groups.iter().enumerate() {
210 let mut annotation_group_builder = annotation_groups_builder.reborrow().get(i as u32);
211 annotation_group.write_capnp(&mut annotation_group_builder);
212 }
213
214 let mut annotations_builder = builder
216 .reborrow()
217 .init_annotations(self.annotations.len() as u32);
218 for (i, annotation) in self.annotations.iter().enumerate() {
219 let mut annotation_builder = annotations_builder.reborrow().get(i as u32);
220 annotation.write_capnp(&mut annotation_builder);
221 }
222
223 let mut annotation_group_samples_builder = builder
225 .reborrow()
226 .init_annotation_group_samples(self.annotation_group_samples.len() as u32);
227 for (i, annotation_group_sample) in self.annotation_group_samples.iter().enumerate() {
228 let mut annotation_group_sample_builder =
229 annotation_group_samples_builder.reborrow().get(i as u32);
230 annotation_group_sample.write_capnp(&mut annotation_group_sample_builder);
231 }
232 }
233
234 fn read_capnp(reader: Self::Reader) -> Self {
235 let collections_reader = reader.get_collections().unwrap();
237 let mut collections = Vec::new();
238 for collection_reader in collections_reader.iter() {
239 collections.push(crate::collection::Collection::read_capnp(collection_reader));
240 }
241
242 let samples_reader = reader.get_samples().unwrap();
244 let mut samples = Vec::new();
245 for sample_reader in samples_reader.iter() {
246 samples.push(crate::sample::Sample::read_capnp(sample_reader));
247 }
248
249 let sequences_reader = reader.get_sequences().unwrap();
251 let mut sequences = Vec::new();
252 for sequence_reader in sequences_reader.iter() {
253 sequences.push(Sequence::read_capnp(sequence_reader));
254 }
255
256 let block_groups_reader = reader.get_block_groups().unwrap();
258 let mut block_groups = Vec::new();
259 for block_group_reader in block_groups_reader.iter() {
260 block_groups.push(BlockGroup::read_capnp(block_group_reader));
261 }
262
263 let nodes_reader = reader.get_nodes().unwrap();
265 let mut nodes = Vec::new();
266 for node_reader in nodes_reader.iter() {
267 nodes.push(Node::read_capnp(node_reader));
268 }
269
270 let edges_reader = reader.get_edges().unwrap();
272 let mut edges = Vec::new();
273 for edge_reader in edges_reader.iter() {
274 edges.push(Edge::read_capnp(edge_reader));
275 }
276
277 let block_group_edges_reader = reader.get_block_group_edges().unwrap();
279 let mut block_group_edges = Vec::new();
280 for block_group_edge_reader in block_group_edges_reader.iter() {
281 block_group_edges.push(crate::block_group_edge::BlockGroupEdge::read_capnp(
282 block_group_edge_reader,
283 ));
284 }
285
286 let paths_reader = reader.get_paths().unwrap();
288 let mut paths = Vec::new();
289 for path_reader in paths_reader.iter() {
290 paths.push(Path::read_capnp(path_reader));
291 }
292
293 let path_edges_reader = reader.get_path_edges().unwrap();
295 let mut path_edges = Vec::new();
296 for path_edge_reader in path_edges_reader.iter() {
297 path_edges.push(PathEdge::read_capnp(path_edge_reader));
298 }
299
300 let accessions_reader = reader.get_accessions().unwrap();
302 let mut accessions = Vec::new();
303 for accession_reader in accessions_reader.iter() {
304 accessions.push(Accession::read_capnp(accession_reader));
305 }
306
307 let accession_edges_reader = reader.get_accession_edges().unwrap();
309 let mut accession_edges = Vec::new();
310 for accession_edge_reader in accession_edges_reader.iter() {
311 accession_edges.push(AccessionEdge::read_capnp(accession_edge_reader));
312 }
313
314 let accession_paths_reader = reader.get_accession_paths().unwrap();
316 let mut accession_paths = Vec::new();
317 for accession_path_reader in accession_paths_reader.iter() {
318 accession_paths.push(AccessionPath::read_capnp(accession_path_reader));
319 }
320
321 let annotation_groups_reader = reader.get_annotation_groups().unwrap();
323 let mut annotation_groups = Vec::new();
324 for annotation_group_reader in annotation_groups_reader.iter() {
325 annotation_groups.push(AnnotationGroup::read_capnp(annotation_group_reader));
326 }
327
328 let annotations_reader = reader.get_annotations().unwrap();
330 let mut annotations = Vec::new();
331 for annotation_reader in annotations_reader.iter() {
332 annotations.push(Annotation::read_capnp(annotation_reader));
333 }
334
335 let annotation_group_samples_reader = reader.get_annotation_group_samples().unwrap();
337 let mut annotation_group_samples = Vec::new();
338 for annotation_group_sample_reader in annotation_group_samples_reader.iter() {
339 annotation_group_samples.push(AnnotationGroupSample::read_capnp(
340 annotation_group_sample_reader,
341 ));
342 }
343
344 ChangesetModels {
345 collections,
346 samples,
347 sequences,
348 block_groups,
349 nodes,
350 edges,
351 block_group_edges,
352 paths,
353 path_edges,
354 accessions,
355 accession_edges,
356 accession_paths,
357 annotation_groups,
358 annotations,
359 annotation_group_samples,
360 }
361 }
362}
363
364pub fn parse_string(item: &ChangesetItem, col: usize) -> String {
366 str::from_utf8(item.new_value(col).unwrap().as_bytes().unwrap())
367 .unwrap()
368 .to_string()
369}
370
371pub fn parse_maybe_string(item: &ChangesetItem, col: usize) -> Option<String> {
372 item.new_value(col)
373 .unwrap()
374 .as_bytes_or_null()
375 .unwrap()
376 .map(|v| str::from_utf8(v).unwrap().to_string())
377}
378
379pub fn parse_blob(item: &ChangesetItem, col: usize) -> [u8; 32] {
380 let bytes = item.new_value(col).unwrap().as_bytes().unwrap();
381
382 bytes.try_into().expect("blob must be exactly 32 bytes")
383}
384
385pub fn parse_maybe_blob(item: &ChangesetItem, col: usize) -> Option<[u8; 32]> {
386 item.new_value(col)
387 .unwrap()
388 .as_bytes_or_null()
389 .unwrap()
390 .map(|v| v.try_into().expect("blob must be exactly 32 bytes"))
391}
392
393pub fn parse_hashid(item: &ChangesetItem, col: usize) -> HashId {
394 HashId(parse_blob(item, col))
395}
396
397pub fn parse_maybe_hashid(item: &ChangesetItem, col: usize) -> Option<HashId> {
398 parse_maybe_blob(item, col).map(HashId)
399}
400
401pub fn parse_number(item: &ChangesetItem, col: usize) -> i64 {
402 item.new_value(col).unwrap().as_i64().unwrap()
403}
404
405pub fn parse_maybe_number(item: &ChangesetItem, col: usize) -> Option<i64> {
406 item.new_value(col).unwrap().as_i64_or_null().unwrap()
407}
408
409pub fn process_changesetiter(
410 conn: &GraphConnection,
411 mut changes: &[u8],
412) -> (ChangesetModels, DependencyModels) {
413 use fallible_streaming_iterator::FallibleStreamingIterator;
414 use gen_core::is_terminal;
415 use itertools::Itertools;
416
417 use crate::{
418 accession::{Accession, AccessionEdge},
419 block_group::BlockGroup,
420 edge::Edge,
421 node::Node,
422 path::Path,
423 sequence::Sequence,
424 traits::Query,
425 };
426
427 let mut created_block_groups = vec![];
429 let mut created_edges = vec![];
430 let mut created_nodes = vec![];
431 let mut created_sequences = vec![];
432 let mut created_bg_edges = vec![];
433 let mut created_samples = vec![];
434 let mut created_collections = vec![];
435 let mut created_paths = vec![];
436 let mut created_path_edges = vec![];
437 let mut created_accessions = vec![];
438 let mut created_accession_edges = vec![];
439 let mut created_accession_paths = vec![];
440 let mut created_annotation_groups = vec![];
441 let mut created_annotations = vec![];
442 let mut created_annotation_group_samples = vec![];
443
444 let mut previous_collections = HashSet::new();
446 let mut previous_samples = HashSet::new();
447 let mut previous_block_groups = HashSet::new();
448 let mut previous_edges = HashSet::new();
449 let mut previous_paths = HashSet::new();
450 let mut previous_accessions = HashSet::new();
451 let mut previous_nodes = HashSet::new();
452 let mut previous_sequences = HashSet::new();
453 let mut previous_accession_edges = HashSet::new();
454 let mut created_block_groups_set = HashSet::new();
455 let mut created_paths_set = HashSet::new();
456 let mut created_accessions_set = HashSet::new();
457 let mut created_edges_set = HashSet::new();
458 let mut created_accession_edges_set = HashSet::new();
459 let mut created_nodes_set = HashSet::new();
460 let mut created_sequences_set = HashSet::new();
461 let mut created_samples_set: HashSet<String> = HashSet::new();
462 let mut created_collections_set: HashSet<String> = HashSet::new();
463
464 let input: &mut dyn Read = &mut changes;
465 let mut iter = ChangesetIter::start_strm(&input).unwrap();
466
467 while let Some(item) = iter.next().unwrap() {
468 let op = item.op().unwrap();
469 if !op.indirect() {
471 let table = op.table_name();
472 let pk_column = item
473 .pk()
474 .unwrap()
475 .iter()
476 .find_position(|item| **item == 1)
477 .unwrap()
478 .0;
479 match table {
480 "sequences" => {
481 let hash = parse_hashid(item, pk_column);
482 let sequence = Sequence::new()
483 .sequence_type(&parse_string(item, 1))
484 .sequence(&parse_string(item, 2))
485 .name(&parse_string(item, 3))
486 .file_path(&parse_string(item, 4))
487 .length(parse_number(item, 5))
488 .build();
489 assert_eq!(hash, sequence.hash);
490 created_sequences.push(sequence);
491 created_sequences_set.insert(hash);
492 }
493 "block_groups" => {
494 let bg_pk = HashId(parse_blob(item, pk_column));
495 let collection = parse_string(item, 1);
496 let sample_name = parse_maybe_string(item, 2);
497 let name = parse_string(item, 3);
498 let created_on = parse_number(item, 4);
499
500 created_block_groups.push(BlockGroup {
501 id: bg_pk,
502 collection_name: collection.clone(),
503 sample_name: sample_name.clone(),
504 name,
505 created_on,
506 });
507
508 created_block_groups_set.insert(bg_pk);
509 if !created_collections_set.contains(&collection) {
510 previous_collections.insert(collection);
511 }
512 if let Some(sample_name) = sample_name
513 && !created_samples_set.contains(&sample_name)
514 {
515 previous_samples.insert(sample_name);
516 }
517 }
518 "nodes" => {
519 let node_id = parse_hashid(item, pk_column);
520 let sequence_hash = parse_hashid(item, 1);
521
522 created_nodes.push(Node {
523 id: node_id,
524 sequence_hash,
525 });
526
527 created_nodes_set.insert(node_id);
528 if !created_sequences_set.contains(&sequence_hash) {
529 previous_sequences.insert(sequence_hash);
530 }
531 }
532 "edges" => {
533 let edge_id = HashId(parse_blob(item, pk_column));
534 let source_node_id = parse_hashid(item, 1);
535 let target_node_id = parse_hashid(item, 4);
536
537 created_edges.push(Edge {
538 id: edge_id,
539 source_node_id,
540 source_coordinate: parse_number(item, 2),
541 source_strand: Strand::column_result(item.new_value(3).unwrap()).unwrap(),
542 target_node_id,
543 target_coordinate: parse_number(item, 5),
544 target_strand: Strand::column_result(item.new_value(6).unwrap()).unwrap(),
545 });
546
547 created_edges_set.insert(edge_id);
548 let nodes = Node::query_by_ids(conn, &[source_node_id, target_node_id]);
549 for node in nodes.iter() {
550 if !created_nodes_set.contains(&node.id) && !is_terminal(node.id) {
551 previous_sequences.insert(node.sequence_hash);
552 previous_nodes.insert(node.id);
553 }
554 }
555 }
556 "block_group_edges" => {
557 let bg_id = HashId(parse_blob(item, 1));
558 let edge_id = HashId(parse_blob(item, 2));
559
560 created_bg_edges.push(BlockGroupEdge {
561 id: HashId(parse_blob(item, pk_column)),
562 block_group_id: bg_id,
563 edge_id,
564 chromosome_index: parse_number(item, 3),
565 phased: parse_number(item, 4),
566 created_on: parse_number(item, 5),
567 });
568
569 if !created_edges_set.contains(&edge_id) {
570 previous_edges.insert(edge_id);
571 }
572 if !created_block_groups_set.contains(&bg_id) {
573 previous_block_groups.insert(bg_id);
574 }
575 }
576 "samples" => {
577 let name = parse_string(item, pk_column);
578 created_samples.push(Sample { name: name.clone() });
579 created_samples_set.insert(name);
580 }
581 "collections" => {
582 let name = parse_string(item, pk_column);
583 created_collections.push(Collection { name: name.clone() });
584 created_collections_set.insert(name);
585 }
586 "paths" => {
587 let path_id = HashId(parse_blob(item, pk_column));
588 let bg_id = HashId(parse_blob(item, 1));
589
590 created_paths.push(Path {
591 id: path_id,
592 block_group_id: bg_id,
593 name: parse_string(item, 2),
594 created_on: parse_number(item, 3),
595 });
596
597 created_paths_set.insert(path_id);
598 if !created_block_groups_set.contains(&bg_id) {
599 previous_block_groups.insert(bg_id);
600 }
601 }
602 "path_edges" => {
603 let path_id = HashId(parse_blob(item, 1));
604 let edge_id = HashId(parse_blob(item, 2));
605
606 created_path_edges.push(PathEdge {
607 id: HashId(parse_blob(item, pk_column)),
608 path_id,
609 edge_id,
610 index_in_path: parse_number(item, 3),
611 });
612
613 if !created_paths_set.contains(&path_id) {
614 previous_paths.insert(path_id);
615 }
616 if !created_edges_set.contains(&edge_id) {
617 previous_edges.insert(edge_id);
618 }
619 }
620 "accessions" => {
621 let accession_id = HashId(parse_blob(item, pk_column));
622 let path_id = HashId(parse_blob(item, 2));
623 let parent_accession_id = parse_maybe_hashid(item, 3);
624
625 created_accessions.push(Accession {
626 id: accession_id,
627 name: parse_string(item, 1),
628 path_id,
629 parent_accession_id,
630 });
631
632 created_accessions_set.insert(accession_id);
633 if !created_paths_set.contains(&path_id) {
634 previous_paths.insert(path_id);
635 }
636 if let Some(id) = parent_accession_id
637 && !created_accessions_set.contains(&id)
638 {
639 previous_accessions.insert(id);
640 }
641 }
642 "accession_edges" => {
643 let edge_id = parse_hashid(item, pk_column);
644 let source_node_id = parse_hashid(item, 1);
645 let target_node_id = parse_hashid(item, 4);
646
647 created_accession_edges.push(AccessionEdge {
648 id: edge_id,
649 source_node_id,
650 source_coordinate: parse_number(item, 2),
651 source_strand: Strand::column_result(item.new_value(3).unwrap()).unwrap(),
652 target_node_id,
653 target_coordinate: parse_number(item, 5),
654 target_strand: Strand::column_result(item.new_value(6).unwrap()).unwrap(),
655 chromosome_index: parse_number(item, 7),
656 });
657
658 created_accession_edges_set.insert(edge_id);
659 let nodes = Node::query_by_ids(conn, &[source_node_id, target_node_id]);
660 if !created_nodes_set.contains(&source_node_id) {
661 previous_sequences.insert(nodes[0].sequence_hash);
662 }
663 if source_node_id != target_node_id
664 && !created_nodes_set.contains(&target_node_id)
665 {
666 previous_sequences.insert(nodes[1].sequence_hash);
667 }
668 }
669 "accession_paths" => {
670 let accession_id = parse_hashid(item, 1);
671 let edge_id = parse_hashid(item, 3);
672
673 created_accession_paths.push(AccessionPath {
674 id: parse_hashid(item, pk_column),
675 accession_id,
676 index_in_path: parse_number(item, 2),
677 edge_id,
678 });
679
680 if !created_accessions_set.contains(&accession_id) {
681 previous_accessions.insert(accession_id);
682 }
683 if !created_accession_edges_set.contains(&edge_id) {
684 previous_accession_edges.insert(edge_id);
685 }
686 }
687 "annotations" => {
688 let id = parse_hashid(item, pk_column);
689 let name = parse_string(item, 1);
690 let group = parse_string(item, 2);
691 let accession_id = parse_hashid(item, 3);
692
693 created_annotations.push(Annotation {
694 id,
695 name,
696 group,
697 accession_id,
698 });
699
700 if !created_accessions_set.contains(&accession_id) {
701 previous_accessions.insert(accession_id);
702 }
703 }
704 "annotation_groups" => {
705 let name = parse_string(item, pk_column);
706
707 created_annotation_groups.push(AnnotationGroup { name });
708 }
709 "annotation_group_samples" => {
710 let annotation_group = parse_string(item, 0);
711 let sample_name = parse_string(item, 1);
712
713 created_annotation_group_samples.push(AnnotationGroupSample {
714 annotation_group,
715 sample_name: sample_name.clone(),
716 });
717
718 if !created_samples_set.contains(&sample_name) {
719 previous_samples.insert(sample_name);
720 }
721 }
722 t => {
723 println!("unhandled table {t}")
724 }
725 }
726 }
727 }
728
729 let existing_edges = Edge::query_by_ids(
731 conn,
732 &previous_edges.clone().into_iter().collect::<Vec<_>>(),
733 );
734 let mut new_nodes = vec![];
735 for edge in existing_edges.iter() {
736 if !previous_nodes.contains(&edge.source_node_id) && !is_terminal(edge.source_node_id) {
737 previous_nodes.insert(edge.source_node_id);
738 new_nodes.push(edge.source_node_id);
739 }
740 if !previous_nodes.contains(&edge.target_node_id) && !is_terminal(edge.target_node_id) {
741 previous_nodes.insert(edge.target_node_id);
742 new_nodes.push(edge.target_node_id);
743 }
744 }
745 for node in Node::query_by_ids(conn, &new_nodes) {
746 previous_sequences.insert(node.sequence_hash);
747 }
748
749 let changeset_models = ChangesetModels {
750 sequences: created_sequences,
751 block_groups: created_block_groups,
752 nodes: created_nodes,
753 edges: created_edges,
754 block_group_edges: created_bg_edges,
755 samples: created_samples,
756 collections: created_collections,
757 paths: created_paths,
758 path_edges: created_path_edges,
759 accessions: created_accessions,
760 accession_edges: created_accession_edges,
761 accession_paths: created_accession_paths,
762 annotation_groups: created_annotation_groups,
763 annotations: created_annotations,
764 annotation_group_samples: created_annotation_group_samples,
765 };
766
767 let dependency_models = DependencyModels {
768 collections: Collection::query_by_ids(conn, &previous_collections),
769 samples: Sample::query_by_ids(conn, &previous_samples),
770 sequences: Sequence::query_by_ids(conn, &previous_sequences),
771 block_group: BlockGroup::query_by_ids(conn, &previous_block_groups),
772 nodes: Node::query_by_ids(conn, &previous_nodes),
773 edges: Edge::query_by_ids(conn, &previous_edges),
774 paths: Path::query_by_ids(conn, &previous_paths),
775 accessions: Accession::query_by_ids(conn, &previous_accessions),
776 accession_edges: AccessionEdge::query_by_ids(conn, &previous_accession_edges),
777 };
778
779 (changeset_models, dependency_models)
780}
781
782pub fn apply_changeset(
783 conn: &GraphConnection,
784 changeset: &ChangesetModels,
785 dependencies: &DependencyModels,
786) -> Result<(), ChangesetError> {
787 for collection in dependencies.collections.iter() {
788 Collection::create(conn, &collection.name);
789 }
790
791 for sample in dependencies.samples.iter() {
792 Sample::get_or_create(conn, &sample.name);
793 }
794
795 for sequence in dependencies.sequences.iter() {
796 NewSequence::from(sequence).save(conn);
797 }
798 for node in dependencies.nodes.iter() {
799 if !is_terminal(node.id) {
800 assert!(Sequence::get_by_id(conn, &node.sequence_hash).is_some());
801 }
802 }
803
804 for bg in dependencies.block_group.iter() {
805 let sample_name = bg.sample_name.as_ref().map(|v| v as &str);
806 BlockGroup::create(conn, &bg.collection_name, sample_name, &bg.name);
807 }
808
809 for node in dependencies.nodes.iter() {
810 Node::create(conn, &node.sequence_hash, &node.id);
811 }
812
813 Edge::bulk_create(
814 conn,
815 &dependencies
816 .edges
817 .iter()
818 .map(EdgeData::from)
819 .collect::<Vec<_>>(),
820 );
821
822 for path in dependencies.paths.iter() {
823 Path::create(conn, &path.name, &path.block_group_id, &[]);
824 }
825
826 AccessionEdge::bulk_create(
827 conn,
828 &dependencies
829 .accession_edges
830 .iter()
831 .map(AccessionEdgeData::from)
832 .collect::<Vec<AccessionEdgeData>>(),
833 );
834
835 for accession in dependencies.accessions.iter() {
836 Accession::get_or_create(
837 conn,
838 &accession.name,
839 &accession.path_id,
840 accession.parent_accession_id.as_ref(),
841 );
842 }
843
844 for collection in &changeset.collections {
845 Collection::create(conn, &collection.name);
846 }
847 for sample in &changeset.samples {
848 Sample::get_or_create(conn, &sample.name);
849 }
850 for sequence in &changeset.sequences {
851 NewSequence::from(sequence).save(conn);
852 }
853 for bg in &changeset.block_groups {
854 BlockGroup::create(
855 conn,
856 &bg.collection_name,
857 bg.sample_name.as_deref(),
858 &bg.name,
859 );
860 }
861
862 for node in &changeset.nodes {
863 Node::create(conn, &node.sequence_hash, &node.id);
864 }
865
866 Edge::bulk_create(
867 conn,
868 &changeset
869 .edges
870 .iter()
871 .map(EdgeData::from)
872 .collect::<Vec<_>>(),
873 );
874 BlockGroupEdge::bulk_create(
875 conn,
876 &changeset
877 .block_group_edges
878 .iter()
879 .map(BlockGroupEdgeData::from)
880 .collect::<Vec<BlockGroupEdgeData>>(),
881 );
882
883 for path in &changeset.paths {
884 Path::create(conn, &path.name, &path.block_group_id, &[]);
885 let edges = changeset
886 .path_edges
887 .iter()
888 .filter(|edge| edge.path_id == path.id)
889 .sorted_by(|e1, e2| Ord::cmp(&e1.index_in_path, &e2.index_in_path))
890 .map(|path_edge| path_edge.edge_id);
891 PathEdge::bulk_create(conn, &path.id, &edges.collect::<Vec<_>>());
892 }
893
894 AccessionEdge::bulk_create(
895 conn,
896 &changeset
897 .accession_edges
898 .iter()
899 .map(AccessionEdgeData::from)
900 .collect::<Vec<_>>(),
901 );
902
903 for accession in &changeset.accessions {
904 Accession::get_or_create(
905 conn,
906 &accession.name,
907 &accession.path_id,
908 accession.parent_accession_id.as_ref(),
909 );
910 let edges = changeset
911 .accession_paths
912 .iter()
913 .filter(|ap| ap.accession_id == accession.id)
914 .sorted_by(|e1, e2| Ord::cmp(&e1.index_in_path, &e2.index_in_path))
915 .map(|ap| ap.edge_id);
916 AccessionPath::create(conn, &accession.id, &edges.collect::<Vec<_>>());
917 }
918
919 for annotation_group in &changeset.annotation_groups {
920 AnnotationGroup::get_or_create(conn, &annotation_group.name).map_err(|err| match err {
921 AnnotationGroupError::DatabaseError(inner) => inner,
922 })?;
923 }
924
925 for annotation in &changeset.annotations {
926 Annotation::get_or_create(
927 conn,
928 &annotation.name,
929 &annotation.group,
930 &annotation.accession_id,
931 )
932 .map_err(|err| match err {
933 AnnotationError::DatabaseError(inner) => inner,
934 AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
935 inner
936 }
937 })?;
938 }
939
940 for annotation_group_sample in &changeset.annotation_group_samples {
941 AnnotationGroupSample::create(
942 conn,
943 &annotation_group_sample.annotation_group,
944 &annotation_group_sample.sample_name,
945 )
946 .map_err(|err| match err {
947 AnnotationError::DatabaseError(inner) => inner,
948 AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
949 inner
950 }
951 })?;
952 }
953 Ok(())
954}
955
956pub fn revert_changeset(
957 conn: &GraphConnection,
958 changeset: &ChangesetModels,
959) -> Result<(), ChangesetError> {
960 for annotation_group_sample in &changeset.annotation_group_samples {
961 AnnotationGroupSample::delete(
962 conn,
963 &annotation_group_sample.annotation_group,
964 &annotation_group_sample.sample_name,
965 )
966 .map_err(|err| match err {
967 AnnotationError::DatabaseError(inner) => inner,
968 AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
969 inner
970 }
971 })?;
972 }
973 Annotation::delete_by_ids(
974 conn,
975 &changeset
976 .annotations
977 .iter()
978 .map(|obj| obj.id)
979 .collect::<Vec<_>>(),
980 );
981 AnnotationGroup::delete_by_ids(
982 conn,
983 &changeset
984 .annotation_groups
985 .iter()
986 .map(|group| group.name.clone())
987 .collect::<Vec<_>>(),
988 );
989 AccessionPath::delete_by_ids(
990 conn,
991 &changeset
992 .accession_paths
993 .iter()
994 .map(|obj| obj.id)
995 .collect::<Vec<_>>(),
996 );
997 AccessionEdge::delete_by_ids(
999 conn,
1000 &changeset
1001 .accession_edges
1002 .iter()
1003 .map(|pe| pe.id)
1004 .collect::<Vec<_>>(),
1005 );
1006 Accession::delete_by_ids(
1007 conn,
1008 &changeset
1009 .accessions
1010 .iter()
1011 .map(|obj| obj.id)
1012 .collect::<Vec<_>>(),
1013 );
1014 PathEdge::delete_by_ids(
1015 conn,
1016 &changeset
1017 .path_edges
1018 .iter()
1019 .map(|pe| pe.id)
1020 .collect::<Vec<_>>(),
1021 );
1022 Path::delete_by_ids(
1023 conn,
1024 &changeset.paths.iter().map(|p| p.id).collect::<Vec<_>>(),
1025 );
1026
1027 BlockGroupEdge::delete_by_ids(
1028 conn,
1029 &changeset
1030 .block_group_edges
1031 .iter()
1032 .map(|obj| obj.id)
1033 .collect::<Vec<_>>(),
1034 );
1035 Edge::delete_by_ids(
1036 conn,
1037 &changeset.edges.iter().map(|obj| obj.id).collect::<Vec<_>>(),
1038 );
1039
1040 BlockGroup::delete_by_ids(
1041 conn,
1042 &changeset
1043 .block_groups
1044 .iter()
1045 .map(|obj| obj.id)
1046 .collect::<Vec<_>>(),
1047 );
1048
1049 Node::delete_by_ids(
1050 conn,
1051 &changeset.nodes.iter().map(|obj| obj.id).collect::<Vec<_>>(),
1052 );
1053 Collection::delete_by_ids(
1054 conn,
1055 &changeset
1056 .collections
1057 .iter()
1058 .map(|obj| obj.name.clone())
1059 .collect::<Vec<_>>(),
1060 );
1061 Sample::delete_by_ids(
1062 conn,
1063 &changeset
1064 .samples
1065 .iter()
1066 .map(|obj| obj.name.clone())
1067 .collect::<Vec<_>>(),
1068 );
1069 Sequence::delete_by_ids(
1070 conn,
1071 &changeset
1072 .sequences
1073 .iter()
1074 .map(|obj| obj.hash)
1075 .collect::<Vec<_>>(),
1076 );
1077 Ok(())
1078}
1079
1080pub fn get_changeset_from_path(path: PathBuf) -> DatabaseChangeset {
1081 use capnp::serialize_packed;
1082 let file = fs::File::open(path).unwrap();
1083 let mut reader = std::io::BufReader::new(file);
1084
1085 let message_reader =
1086 serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new()).unwrap();
1087 let root = message_reader
1088 .get_root::<database_changeset::Reader>()
1089 .unwrap();
1090 DatabaseChangeset::read_capnp(root)
1091}
1092
1093pub fn get_changeset_dependencies_from_path(path: PathBuf) -> DependencyModels {
1094 use capnp::serialize_packed;
1095
1096 let file = fs::File::open(path).unwrap();
1097 let mut reader = std::io::BufReader::new(file);
1098 let message_reader =
1099 serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new()).unwrap();
1100 let root = message_reader
1101 .get_root::<crate::gen_models_capnp::dependency_models::Reader>()
1102 .unwrap();
1103 DependencyModels::read_capnp(root)
1104}
1105
1106pub fn write_changeset(
1107 workspace: &Workspace,
1108 operation: &Operation,
1109 changes: DatabaseChangeset,
1110 dependencies: &DependencyModels,
1111) {
1112 use capnp::{message::Builder, serialize_packed};
1113
1114 let change_path = operation.get_changeset_path(workspace);
1115 let dependency_path = operation.get_changeset_dependencies_path(workspace);
1116
1117 let mut dependency_file = fs::File::create_new(&dependency_path)
1119 .unwrap_or_else(|_| panic!("Unable to open {dependency_path:?}"));
1120 let mut message = Builder::new_default();
1121 let mut dep_root = message.init_root();
1122 dependencies.write_capnp(&mut dep_root);
1123 serialize_packed::write_message(&mut dependency_file, &message).unwrap();
1124
1125 let mut file = fs::File::create_new(&change_path)
1127 .unwrap_or_else(|_| panic!("Unable to open {change_path:?}"));
1128 let mut message = Builder::new_default();
1129 let mut change_root = message.init_root();
1130 changes.write_capnp(&mut change_root);
1131 serialize_packed::write_message(&mut file, &message).unwrap();
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136 use chrono::Utc;
1137
1138 use super::*;
1139 use crate::{
1140 file_types::FileTypes,
1141 operations::{OperationFile, OperationInfo},
1142 session_operations::{end_operation, start_operation},
1143 test_helpers::{setup_block_group, setup_gen},
1144 traits::Query,
1145 };
1146
1147 #[test]
1148 fn test_database_changeset_capnp_serialization() {
1149 use capnp::message::TypedBuilder;
1150 use gen_core::Strand;
1151
1152 let changeset_models = ChangesetModels {
1153 collections: vec![crate::collection::Collection {
1154 name: "test_collection".to_string(),
1155 }],
1156 samples: vec![crate::sample::Sample {
1157 name: "test_sample".to_string(),
1158 }],
1159 sequences: vec![
1160 NewSequence::new()
1161 .sequence("ATCG")
1162 .sequence_type("DNA")
1163 .name("test_seq")
1164 .build(),
1165 ],
1166 block_groups: vec![BlockGroup {
1167 id: HashId::pad_str(1),
1168 collection_name: "test_collection".to_string(),
1169 sample_name: Some("test_sample".to_string()),
1170 name: "test_bg".to_string(),
1171 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1172 }],
1173 nodes: vec![
1174 Node {
1175 id: HashId::convert_str("node_hash"),
1176 sequence_hash: HashId::convert_str("test_hash"),
1177 },
1178 Node {
1179 id: HashId::convert_str("node_hash_2"),
1180 sequence_hash: HashId::convert_str("test_hash_2"),
1181 },
1182 ],
1183 edges: vec![Edge {
1184 id: HashId::pad_str(1),
1185 source_node_id: HashId::convert_str("1"),
1186 source_coordinate: 0,
1187 source_strand: Strand::Forward,
1188 target_node_id: HashId::convert_str("2"),
1189 target_coordinate: 0,
1190 target_strand: Strand::Forward,
1191 }],
1192 block_group_edges: vec![crate::block_group_edge::BlockGroupEdge {
1193 id: HashId::pad_str(1),
1194 block_group_id: HashId::pad_str(1),
1195 edge_id: HashId::pad_str(1),
1196 chromosome_index: 0,
1197 phased: 0,
1198 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1199 }],
1200 paths: vec![Path {
1201 id: HashId::pad_str(1),
1202 block_group_id: HashId::pad_str(1),
1203 name: "test_path".to_string(),
1204 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1205 }],
1206 path_edges: vec![PathEdge {
1207 id: HashId::pad_str(1),
1208 path_id: HashId::pad_str(1),
1209 index_in_path: 0,
1210 edge_id: HashId::pad_str(1),
1211 }],
1212 accessions: vec![Accession {
1213 id: HashId::pad_str(1),
1214 name: "test_accession".to_string(),
1215 path_id: HashId::pad_str(1),
1216 parent_accession_id: None,
1217 }],
1218 accession_edges: vec![AccessionEdge {
1219 id: HashId::pad_str(1),
1220 source_node_id: HashId::convert_str("1"),
1221 source_coordinate: 0,
1222 source_strand: Strand::Forward,
1223 target_node_id: HashId::convert_str("2"),
1224 target_coordinate: 0,
1225 target_strand: Strand::Forward,
1226 chromosome_index: 0,
1227 }],
1228 accession_paths: vec![AccessionPath {
1229 id: HashId::pad_str(1),
1230 accession_id: HashId::pad_str(1),
1231 index_in_path: 0,
1232 edge_id: HashId::pad_str(1),
1233 }],
1234 annotation_groups: vec![AnnotationGroup {
1235 name: "gff3".to_string(),
1236 }],
1237 annotations: vec![Annotation {
1238 id: HashId::pad_str(1),
1239 name: "test_annotation".to_string(),
1240 group: "gff3".to_string(),
1241 accession_id: HashId::pad_str(1),
1242 }],
1243 annotation_group_samples: vec![AnnotationGroupSample {
1244 annotation_group: "gff3".to_string(),
1245 sample_name: "test_sample".to_string(),
1246 }],
1247 };
1248
1249 let changeset = DatabaseChangeset {
1250 db_path: "/path/to/db".to_string(),
1251 changes: changeset_models,
1252 };
1253
1254 let mut message = TypedBuilder::<database_changeset::Owned>::new_default();
1255 let mut root = message.init_root();
1256 changeset.write_capnp(&mut root);
1257
1258 let deserialized = DatabaseChangeset::read_capnp(root.into_reader());
1259 assert_eq!(changeset, deserialized);
1260 }
1261
1262 #[test]
1263 fn test_database_changeset_get_db_path() {
1264 use std::io::Write;
1265
1266 use capnp::{message::Builder, serialize_packed};
1267 use tempfile::NamedTempFile;
1268
1269 let expected_db_path = "/tmp/test_db_path";
1270 let changeset = DatabaseChangeset {
1271 db_path: expected_db_path.to_string(),
1272 changes: ChangesetModels {
1273 collections: vec![],
1274 samples: vec![],
1275 sequences: vec![],
1276 block_groups: vec![],
1277 nodes: vec![],
1278 edges: vec![],
1279 block_group_edges: vec![],
1280 paths: vec![],
1281 path_edges: vec![],
1282 accessions: vec![],
1283 accession_edges: vec![],
1284 accession_paths: vec![],
1285 annotation_groups: vec![],
1286 annotations: vec![],
1287 annotation_group_samples: vec![],
1288 },
1289 };
1290
1291 let mut temp_file = NamedTempFile::new().unwrap();
1292 let path = temp_file.path().to_path_buf();
1293
1294 let mut message = Builder::new_default();
1295 let mut root = message.init_root();
1296 changeset.write_capnp(&mut root);
1297 serialize_packed::write_message(&mut temp_file, &message).unwrap();
1298 temp_file.flush().unwrap();
1299
1300 let db_path = DatabaseChangeset::get_db_path(path.as_path());
1301
1302 assert_eq!(db_path, expected_db_path);
1303 }
1304
1305 #[test]
1306 fn test_changeset_models_capnp_serialization() {
1307 use capnp::message::TypedBuilder;
1308 use gen_core::Strand;
1309
1310 let changeset_models = ChangesetModels {
1311 collections: vec![crate::collection::Collection {
1312 name: "test_collection".to_string(),
1313 }],
1314 samples: vec![crate::sample::Sample {
1315 name: "test_sample".to_string(),
1316 }],
1317 sequences: vec![
1318 NewSequence::new()
1319 .sequence("ATCG")
1320 .sequence_type("DNA")
1321 .name("test_seq")
1322 .build(),
1323 ],
1324 block_groups: vec![BlockGroup {
1325 id: HashId::pad_str(1),
1326 collection_name: "test_collection".to_string(),
1327 sample_name: Some("test_sample".to_string()),
1328 name: "test_bg".to_string(),
1329 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1330 }],
1331 nodes: vec![
1332 Node {
1333 id: HashId::convert_str("node_hash"),
1334 sequence_hash: HashId::convert_str("test_hash"),
1335 },
1336 Node {
1337 id: HashId::convert_str("node_hash_2"),
1338 sequence_hash: HashId::convert_str("test_hash_2"),
1339 },
1340 ],
1341 edges: vec![Edge {
1342 id: HashId::pad_str(1),
1343 source_node_id: HashId::convert_str("1"),
1344 source_coordinate: 0,
1345 source_strand: Strand::Forward,
1346 target_node_id: HashId::convert_str("2"),
1347 target_coordinate: 0,
1348 target_strand: Strand::Forward,
1349 }],
1350 block_group_edges: vec![crate::block_group_edge::BlockGroupEdge {
1351 id: HashId::pad_str(1),
1352 block_group_id: HashId::pad_str(1),
1353 edge_id: HashId::pad_str(1),
1354 chromosome_index: 0,
1355 phased: 0,
1356 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1357 }],
1358 paths: vec![Path {
1359 id: HashId::pad_str(1),
1360 block_group_id: HashId::pad_str(1),
1361 name: "test_path".to_string(),
1362 created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1363 }],
1364 path_edges: vec![PathEdge {
1365 id: HashId::pad_str(1),
1366 path_id: HashId::pad_str(1),
1367 index_in_path: 0,
1368 edge_id: HashId::pad_str(1),
1369 }],
1370 accessions: vec![Accession {
1371 id: HashId::pad_str(1),
1372 name: "test_accession".to_string(),
1373 path_id: HashId::pad_str(1),
1374 parent_accession_id: None,
1375 }],
1376 accession_edges: vec![AccessionEdge {
1377 id: HashId::pad_str(1),
1378 source_node_id: HashId::convert_str("1"),
1379 source_coordinate: 0,
1380 source_strand: Strand::Forward,
1381 target_node_id: HashId::convert_str("2"),
1382 target_coordinate: 0,
1383 target_strand: Strand::Forward,
1384 chromosome_index: 0,
1385 }],
1386 accession_paths: vec![AccessionPath {
1387 id: HashId::pad_str(1),
1388 accession_id: HashId::pad_str(1),
1389 index_in_path: 0,
1390 edge_id: HashId::pad_str(1),
1391 }],
1392 annotation_groups: vec![AnnotationGroup {
1393 name: "gff3".to_string(),
1394 }],
1395 annotations: vec![Annotation {
1396 id: HashId::pad_str(1),
1397 name: "test_annotation".to_string(),
1398 group: "gff3".to_string(),
1399 accession_id: HashId::pad_str(1),
1400 }],
1401 annotation_group_samples: vec![AnnotationGroupSample {
1402 annotation_group: "gff3".to_string(),
1403 sample_name: "test_sample".to_string(),
1404 }],
1405 };
1406
1407 let mut message = TypedBuilder::<changeset_models::Owned>::new_default();
1408 let mut root = message.init_root();
1409 changeset_models.write_capnp(&mut root);
1410
1411 let deserialized = ChangesetModels::read_capnp(root.into_reader());
1412
1413 assert_eq!(changeset_models, deserialized);
1414 }
1415
1416 #[test]
1417 fn test_changeset_includes_annotations() {
1418 use crate::block_group::PathCache;
1419
1420 let context = setup_gen();
1421 let conn = context.graph().conn();
1422 let op_conn = context.operations().conn();
1423
1424 let db_uuid = crate::metadata::get_db_uuid(conn);
1425 crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path").unwrap();
1426
1427 let _ = Sample::create(conn, "sample-1").unwrap();
1428 let (block_group_id, path) = setup_block_group(conn);
1429 let mut cache = PathCache::new(conn);
1430 let _ = PathCache::lookup(&mut cache, &block_group_id, path.name.clone());
1431 let accession = BlockGroup::add_accession(conn, &path, "ann-accession", 0, 5, &mut cache);
1432
1433 let mut session = start_operation(conn);
1434 let annotation = Annotation::get_or_create(conn, "gene-a", "gff3", &accession.id).unwrap();
1435 annotation.add_samples(conn, &["sample-1"]).unwrap();
1436
1437 let operation = end_operation(
1438 &context,
1439 &mut session,
1440 &OperationInfo {
1441 files: vec![],
1442 description: "annotation op".to_string(),
1443 },
1444 "annotation op",
1445 None,
1446 )
1447 .unwrap();
1448
1449 let changeset = operation.get_changeset(context.workspace());
1450 assert_eq!(
1451 changeset.changes.annotation_groups,
1452 vec![AnnotationGroup {
1453 name: "gff3".to_string(),
1454 }]
1455 );
1456 assert_eq!(changeset.changes.annotations, vec![annotation.clone()]);
1457 assert_eq!(
1458 changeset.changes.annotation_group_samples,
1459 vec![AnnotationGroupSample {
1460 annotation_group: annotation.group.clone(),
1461 sample_name: "sample-1".to_string(),
1462 }]
1463 );
1464
1465 let dependencies = operation.get_changeset_dependencies(context.workspace());
1466 assert_eq!(dependencies.samples.len(), 1);
1467 assert_eq!(dependencies.samples[0].name, "sample-1");
1468 assert_eq!(dependencies.accessions.len(), 1);
1469 assert_eq!(dependencies.accessions[0].id, accession.id);
1470 }
1471
1472 #[cfg(test)]
1473 mod changeset_dependencies {
1474 use super::*;
1475
1476 #[test]
1477 fn test_tracks_nodes_and_sequences_from_previous_block_group_edges() {
1478 let context = setup_gen();
1479 let conn = context.graph().conn();
1480 let op_conn = context.operations().conn();
1481
1482 let db_uuid = crate::metadata::get_db_uuid(conn);
1483 crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path")
1484 .unwrap();
1485
1486 let (bg_id, _path_id) = setup_block_group(conn);
1487 let old_edges = BlockGroupEdge::edges_for_block_group(conn, &bg_id);
1488
1489 let mut session = start_operation(conn);
1490 let _ = Sample::create(conn, "new").unwrap();
1492 let new_bg = BlockGroup::create(conn, "test", Some("new"), "new-bg");
1493 let shared_edge = old_edges[0].edge.clone();
1494 BlockGroupEdge::bulk_create(
1495 conn,
1496 &[BlockGroupEdgeData {
1497 block_group_id: new_bg.id,
1498 edge_id: shared_edge.id,
1499 chromosome_index: 0,
1500 phased: 0,
1501 }],
1502 );
1503 let operation = end_operation(
1504 &context,
1505 &mut session,
1506 &OperationInfo {
1507 files: vec![],
1508 description: "test".to_string(),
1509 },
1510 "test",
1511 None,
1512 )
1513 .unwrap();
1514
1515 let dependencies = operation.get_changeset_dependencies(context.workspace());
1516 assert_eq!(dependencies.nodes[0].id, shared_edge.target_node_id);
1517 assert_eq!(dependencies.nodes.len(), 1);
1518 let nodes = Node::query_by_ids(conn, &[shared_edge.target_node_id]);
1519 assert_eq!(dependencies.sequences[0].hash, nodes[0].sequence_hash);
1520 assert_eq!(dependencies.sequences.len(), 1);
1521 }
1522
1523 #[test]
1524 fn test_records_patch_dependencies() {
1525 let context = setup_gen();
1526 let conn = context.graph().conn();
1527 let op_conn = context.operations().conn();
1528
1529 let db_uuid = crate::metadata::get_db_uuid(conn);
1530 crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path")
1531 .unwrap();
1532
1533 let (bg_id, _path_id) = setup_block_group(conn);
1535 let dep_bg = BlockGroup::get_by_id(conn, &bg_id);
1536
1537 let existing_seq = Sequence::new()
1538 .sequence_type("DNA")
1539 .sequence("AAAATTTT")
1540 .save(conn);
1541 let existing_node_id =
1542 Node::create(conn, &existing_seq.hash, &HashId::convert_str("1"));
1543
1544 let mut session = start_operation(conn);
1545
1546 let random_seq = Sequence::new()
1547 .sequence_type("DNA")
1548 .sequence("ATCG")
1549 .save(conn);
1550 let random_node_id = Node::create(conn, &random_seq.hash, &HashId::convert_str("2"));
1551
1552 let new_edge = Edge::create(
1553 conn,
1554 random_node_id,
1555 0,
1556 Strand::Forward,
1557 existing_node_id,
1558 0,
1559 Strand::Forward,
1560 );
1561 let block_group_edge = BlockGroupEdgeData {
1562 block_group_id: bg_id,
1563 edge_id: new_edge.id,
1564 chromosome_index: 0,
1565 phased: 0,
1566 };
1567 BlockGroupEdge::bulk_create(conn, &[block_group_edge]);
1568 let operation = end_operation(
1569 &context,
1570 &mut session,
1571 &OperationInfo {
1572 files: vec![OperationFile {
1573 file_path: "test".to_string(),
1574 file_type: FileTypes::None,
1575 }],
1576 description: "test".to_string(),
1577 },
1578 "test",
1579 None,
1580 )
1581 .unwrap();
1582
1583 let dependencies = operation.get_changeset_dependencies(context.workspace());
1584 assert_eq!(dependencies.sequences.len(), 1);
1585 assert_eq!(
1586 dependencies.block_group[0].collection_name,
1587 dep_bg.collection_name
1588 );
1589 assert_eq!(dependencies.block_group[0].name, dep_bg.name);
1590 assert_eq!(dependencies.block_group[0].sample_name, dep_bg.sample_name);
1591 }
1592 }
1593}