Skip to main content

gen_models/
changesets.rs

1use std::{
2    collections::HashSet,
3    convert::TryInto,
4    fs,
5    io::Read,
6    path::{Path as StdPath, PathBuf},
7    str,
8};
9
10use gen_core::{HashId, Strand, config::Workspace, is_terminal, traits::Capnp};
11use itertools::Itertools;
12use rusqlite::{
13    session::{ChangesetItem, ChangesetIter},
14    types::FromSql,
15};
16use serde::{Deserialize, Serialize};
17
18use crate::{
19    accession::{Accession, AccessionEdge, AccessionEdgeData, AccessionPath},
20    annotations::{
21        Annotation, AnnotationError, AnnotationGroup, AnnotationGroupError, AnnotationGroupSample,
22    },
23    block_group::BlockGroup,
24    block_group_edge::{BlockGroupEdge, BlockGroupEdgeData},
25    collection::Collection,
26    db::GraphConnection,
27    edge::{Edge, EdgeData},
28    errors::ChangesetError,
29    gen_models_capnp::{changeset_models, database_changeset},
30    node::Node,
31    operations::Operation,
32    path::Path,
33    path_edge::PathEdge,
34    sample::Sample,
35    sequence::{NewSequence, Sequence},
36    session_operations::DependencyModels,
37    traits::Query,
38};
39
40#[derive(Debug, Deserialize, Serialize, PartialEq)]
41pub struct DatabaseChangeset {
42    pub db_path: String,
43    pub changes: ChangesetModels,
44}
45
46impl DatabaseChangeset {
47    pub fn get_db_path(path: &StdPath) -> String {
48        use capnp::serialize_packed;
49
50        let file = fs::File::open(path).unwrap();
51        let mut reader = std::io::BufReader::new(file);
52
53        let message_reader =
54            serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new())
55                .unwrap();
56        let root = message_reader
57            .get_root::<database_changeset::Reader>()
58            .unwrap();
59        root.get_db_path().unwrap().to_string().unwrap()
60    }
61}
62
63impl<'a> Capnp<'a> for DatabaseChangeset {
64    type Builder = database_changeset::Builder<'a>;
65    type Reader = database_changeset::Reader<'a>;
66
67    fn write_capnp(&self, builder: &mut Self::Builder) {
68        builder.set_db_path(&self.db_path);
69        let mut changeset_builder = builder.reborrow().init_changes();
70        self.changes.write_capnp(&mut changeset_builder);
71    }
72
73    fn read_capnp(reader: Self::Reader) -> Self {
74        let db_path = reader.get_db_path().unwrap().to_string().unwrap();
75        let changeset_reader = reader.get_changes().unwrap();
76        let changes = ChangesetModels::read_capnp(changeset_reader);
77        DatabaseChangeset { db_path, changes }
78    }
79}
80
81#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
82pub struct ChangesetModels {
83    pub collections: Vec<crate::collection::Collection>,
84    pub samples: Vec<crate::sample::Sample>,
85    pub sequences: Vec<Sequence>,
86    pub block_groups: Vec<BlockGroup>,
87    pub nodes: Vec<Node>,
88    pub edges: Vec<Edge>,
89    pub block_group_edges: Vec<crate::block_group_edge::BlockGroupEdge>,
90    pub paths: Vec<Path>,
91    pub path_edges: Vec<PathEdge>,
92    pub accessions: Vec<Accession>,
93    pub accession_edges: Vec<AccessionEdge>,
94    pub accession_paths: Vec<AccessionPath>,
95    pub annotation_groups: Vec<AnnotationGroup>,
96    pub annotations: Vec<Annotation>,
97    pub annotation_group_samples: Vec<AnnotationGroupSample>,
98}
99
100impl<'a> Capnp<'a> for ChangesetModels {
101    type Builder = changeset_models::Builder<'a>;
102    type Reader = changeset_models::Reader<'a>;
103
104    fn write_capnp(&self, builder: &mut Self::Builder) {
105        // Write collections
106        let mut collections_builder = builder
107            .reborrow()
108            .init_collections(self.collections.len() as u32);
109        for (i, collection) in self.collections.iter().enumerate() {
110            let mut collection_builder = collections_builder.reborrow().get(i as u32);
111            collection.write_capnp(&mut collection_builder);
112        }
113
114        // Write samples
115        let mut samples_builder = builder.reborrow().init_samples(self.samples.len() as u32);
116        for (i, sample) in self.samples.iter().enumerate() {
117            let mut sample_builder = samples_builder.reborrow().get(i as u32);
118            sample.write_capnp(&mut sample_builder);
119        }
120
121        // Write sequences
122        let mut sequences_builder = builder
123            .reborrow()
124            .init_sequences(self.sequences.len() as u32);
125        for (i, sequence) in self.sequences.iter().enumerate() {
126            let mut sequence_builder = sequences_builder.reborrow().get(i as u32);
127            sequence.write_capnp(&mut sequence_builder);
128        }
129
130        // Write block groups
131        let mut block_groups_builder = builder
132            .reborrow()
133            .init_block_groups(self.block_groups.len() as u32);
134        for (i, block_group) in self.block_groups.iter().enumerate() {
135            let mut block_group_builder = block_groups_builder.reborrow().get(i as u32);
136            block_group.write_capnp(&mut block_group_builder);
137        }
138
139        // Write nodes
140        let mut nodes_builder = builder.reborrow().init_nodes(self.nodes.len() as u32);
141        for (i, node) in self.nodes.iter().enumerate() {
142            let mut node_builder = nodes_builder.reborrow().get(i as u32);
143            node.write_capnp(&mut node_builder);
144        }
145
146        // Write edges
147        let mut edges_builder = builder.reborrow().init_edges(self.edges.len() as u32);
148        for (i, edge) in self.edges.iter().enumerate() {
149            let mut edge_builder = edges_builder.reborrow().get(i as u32);
150            edge.write_capnp(&mut edge_builder);
151        }
152
153        // Write block group edges
154        let mut block_group_edges_builder = builder
155            .reborrow()
156            .init_block_group_edges(self.block_group_edges.len() as u32);
157        for (i, block_group_edge) in self.block_group_edges.iter().enumerate() {
158            let mut block_group_edge_builder = block_group_edges_builder.reborrow().get(i as u32);
159            block_group_edge.write_capnp(&mut block_group_edge_builder);
160        }
161
162        // Write paths
163        let mut paths_builder = builder.reborrow().init_paths(self.paths.len() as u32);
164        for (i, path) in self.paths.iter().enumerate() {
165            let mut path_builder = paths_builder.reborrow().get(i as u32);
166            path.write_capnp(&mut path_builder);
167        }
168
169        // Write path edges
170        let mut path_edges_builder = builder
171            .reborrow()
172            .init_path_edges(self.path_edges.len() as u32);
173        for (i, path_edge) in self.path_edges.iter().enumerate() {
174            let mut path_edge_builder = path_edges_builder.reborrow().get(i as u32);
175            path_edge.write_capnp(&mut path_edge_builder);
176        }
177
178        // Write accessions
179        let mut accessions_builder = builder
180            .reborrow()
181            .init_accessions(self.accessions.len() as u32);
182        for (i, accession) in self.accessions.iter().enumerate() {
183            let mut accession_builder = accessions_builder.reborrow().get(i as u32);
184            accession.write_capnp(&mut accession_builder);
185        }
186
187        // Write accession edges
188        let mut accession_edges_builder = builder
189            .reborrow()
190            .init_accession_edges(self.accession_edges.len() as u32);
191        for (i, accession_edge) in self.accession_edges.iter().enumerate() {
192            let mut accession_edge_builder = accession_edges_builder.reborrow().get(i as u32);
193            accession_edge.write_capnp(&mut accession_edge_builder);
194        }
195
196        // Write accession paths
197        let mut accession_paths_builder = builder
198            .reborrow()
199            .init_accession_paths(self.accession_paths.len() as u32);
200        for (i, accession_path) in self.accession_paths.iter().enumerate() {
201            let mut accession_path_builder = accession_paths_builder.reborrow().get(i as u32);
202            accession_path.write_capnp(&mut accession_path_builder);
203        }
204
205        // Write annotation groups
206        let mut annotation_groups_builder = builder
207            .reborrow()
208            .init_annotation_groups(self.annotation_groups.len() as u32);
209        for (i, annotation_group) in self.annotation_groups.iter().enumerate() {
210            let mut annotation_group_builder = annotation_groups_builder.reborrow().get(i as u32);
211            annotation_group.write_capnp(&mut annotation_group_builder);
212        }
213
214        // Write annotations
215        let mut annotations_builder = builder
216            .reborrow()
217            .init_annotations(self.annotations.len() as u32);
218        for (i, annotation) in self.annotations.iter().enumerate() {
219            let mut annotation_builder = annotations_builder.reborrow().get(i as u32);
220            annotation.write_capnp(&mut annotation_builder);
221        }
222
223        // Write annotation group samples
224        let mut annotation_group_samples_builder = builder
225            .reborrow()
226            .init_annotation_group_samples(self.annotation_group_samples.len() as u32);
227        for (i, annotation_group_sample) in self.annotation_group_samples.iter().enumerate() {
228            let mut annotation_group_sample_builder =
229                annotation_group_samples_builder.reborrow().get(i as u32);
230            annotation_group_sample.write_capnp(&mut annotation_group_sample_builder);
231        }
232    }
233
234    fn read_capnp(reader: Self::Reader) -> Self {
235        // Read collections
236        let collections_reader = reader.get_collections().unwrap();
237        let mut collections = Vec::new();
238        for collection_reader in collections_reader.iter() {
239            collections.push(crate::collection::Collection::read_capnp(collection_reader));
240        }
241
242        // Read samples
243        let samples_reader = reader.get_samples().unwrap();
244        let mut samples = Vec::new();
245        for sample_reader in samples_reader.iter() {
246            samples.push(crate::sample::Sample::read_capnp(sample_reader));
247        }
248
249        // Read sequences
250        let sequences_reader = reader.get_sequences().unwrap();
251        let mut sequences = Vec::new();
252        for sequence_reader in sequences_reader.iter() {
253            sequences.push(Sequence::read_capnp(sequence_reader));
254        }
255
256        // Read block groups
257        let block_groups_reader = reader.get_block_groups().unwrap();
258        let mut block_groups = Vec::new();
259        for block_group_reader in block_groups_reader.iter() {
260            block_groups.push(BlockGroup::read_capnp(block_group_reader));
261        }
262
263        // Read nodes
264        let nodes_reader = reader.get_nodes().unwrap();
265        let mut nodes = Vec::new();
266        for node_reader in nodes_reader.iter() {
267            nodes.push(Node::read_capnp(node_reader));
268        }
269
270        // Read edges
271        let edges_reader = reader.get_edges().unwrap();
272        let mut edges = Vec::new();
273        for edge_reader in edges_reader.iter() {
274            edges.push(Edge::read_capnp(edge_reader));
275        }
276
277        // Read block group edges
278        let block_group_edges_reader = reader.get_block_group_edges().unwrap();
279        let mut block_group_edges = Vec::new();
280        for block_group_edge_reader in block_group_edges_reader.iter() {
281            block_group_edges.push(crate::block_group_edge::BlockGroupEdge::read_capnp(
282                block_group_edge_reader,
283            ));
284        }
285
286        // Read paths
287        let paths_reader = reader.get_paths().unwrap();
288        let mut paths = Vec::new();
289        for path_reader in paths_reader.iter() {
290            paths.push(Path::read_capnp(path_reader));
291        }
292
293        // Read path edges
294        let path_edges_reader = reader.get_path_edges().unwrap();
295        let mut path_edges = Vec::new();
296        for path_edge_reader in path_edges_reader.iter() {
297            path_edges.push(PathEdge::read_capnp(path_edge_reader));
298        }
299
300        // Read accessions
301        let accessions_reader = reader.get_accessions().unwrap();
302        let mut accessions = Vec::new();
303        for accession_reader in accessions_reader.iter() {
304            accessions.push(Accession::read_capnp(accession_reader));
305        }
306
307        // Read accession edges
308        let accession_edges_reader = reader.get_accession_edges().unwrap();
309        let mut accession_edges = Vec::new();
310        for accession_edge_reader in accession_edges_reader.iter() {
311            accession_edges.push(AccessionEdge::read_capnp(accession_edge_reader));
312        }
313
314        // Read accession paths
315        let accession_paths_reader = reader.get_accession_paths().unwrap();
316        let mut accession_paths = Vec::new();
317        for accession_path_reader in accession_paths_reader.iter() {
318            accession_paths.push(AccessionPath::read_capnp(accession_path_reader));
319        }
320
321        // Read annotation groups
322        let annotation_groups_reader = reader.get_annotation_groups().unwrap();
323        let mut annotation_groups = Vec::new();
324        for annotation_group_reader in annotation_groups_reader.iter() {
325            annotation_groups.push(AnnotationGroup::read_capnp(annotation_group_reader));
326        }
327
328        // Read annotations
329        let annotations_reader = reader.get_annotations().unwrap();
330        let mut annotations = Vec::new();
331        for annotation_reader in annotations_reader.iter() {
332            annotations.push(Annotation::read_capnp(annotation_reader));
333        }
334
335        // Read annotation group samples
336        let annotation_group_samples_reader = reader.get_annotation_group_samples().unwrap();
337        let mut annotation_group_samples = Vec::new();
338        for annotation_group_sample_reader in annotation_group_samples_reader.iter() {
339            annotation_group_samples.push(AnnotationGroupSample::read_capnp(
340                annotation_group_sample_reader,
341            ));
342        }
343
344        ChangesetModels {
345            collections,
346            samples,
347            sequences,
348            block_groups,
349            nodes,
350            edges,
351            block_group_edges,
352            paths,
353            path_edges,
354            accessions,
355            accession_edges,
356            accession_paths,
357            annotation_groups,
358            annotations,
359            annotation_group_samples,
360        }
361    }
362}
363
364// Helper functions for parsing changeset items
365pub fn parse_string(item: &ChangesetItem, col: usize) -> String {
366    str::from_utf8(item.new_value(col).unwrap().as_bytes().unwrap())
367        .unwrap()
368        .to_string()
369}
370
371pub fn parse_maybe_string(item: &ChangesetItem, col: usize) -> Option<String> {
372    item.new_value(col)
373        .unwrap()
374        .as_bytes_or_null()
375        .unwrap()
376        .map(|v| str::from_utf8(v).unwrap().to_string())
377}
378
379pub fn parse_blob(item: &ChangesetItem, col: usize) -> [u8; 32] {
380    let bytes = item.new_value(col).unwrap().as_bytes().unwrap();
381
382    bytes.try_into().expect("blob must be exactly 32 bytes")
383}
384
385pub fn parse_maybe_blob(item: &ChangesetItem, col: usize) -> Option<[u8; 32]> {
386    item.new_value(col)
387        .unwrap()
388        .as_bytes_or_null()
389        .unwrap()
390        .map(|v| v.try_into().expect("blob must be exactly 32 bytes"))
391}
392
393pub fn parse_hashid(item: &ChangesetItem, col: usize) -> HashId {
394    HashId(parse_blob(item, col))
395}
396
397pub fn parse_maybe_hashid(item: &ChangesetItem, col: usize) -> Option<HashId> {
398    parse_maybe_blob(item, col).map(HashId)
399}
400
401pub fn parse_number(item: &ChangesetItem, col: usize) -> i64 {
402    item.new_value(col).unwrap().as_i64().unwrap()
403}
404
405pub fn parse_maybe_number(item: &ChangesetItem, col: usize) -> Option<i64> {
406    item.new_value(col).unwrap().as_i64_or_null().unwrap()
407}
408
409pub fn process_changesetiter(
410    conn: &GraphConnection,
411    mut changes: &[u8],
412) -> (ChangesetModels, DependencyModels) {
413    use fallible_streaming_iterator::FallibleStreamingIterator;
414    use gen_core::is_terminal;
415    use itertools::Itertools;
416
417    use crate::{
418        accession::{Accession, AccessionEdge},
419        block_group::BlockGroup,
420        edge::Edge,
421        node::Node,
422        path::Path,
423        sequence::Sequence,
424        traits::Query,
425    };
426
427    // Initialize collections for changeset models
428    let mut created_block_groups = vec![];
429    let mut created_edges = vec![];
430    let mut created_nodes = vec![];
431    let mut created_sequences = vec![];
432    let mut created_bg_edges = vec![];
433    let mut created_samples = vec![];
434    let mut created_collections = vec![];
435    let mut created_paths = vec![];
436    let mut created_path_edges = vec![];
437    let mut created_accessions = vec![];
438    let mut created_accession_edges = vec![];
439    let mut created_accession_paths = vec![];
440    let mut created_annotation_groups = vec![];
441    let mut created_annotations = vec![];
442    let mut created_annotation_group_samples = vec![];
443
444    // Initialize collections for dependency tracking
445    let mut previous_collections = HashSet::new();
446    let mut previous_samples = HashSet::new();
447    let mut previous_block_groups = HashSet::new();
448    let mut previous_edges = HashSet::new();
449    let mut previous_paths = HashSet::new();
450    let mut previous_accessions = HashSet::new();
451    let mut previous_nodes = HashSet::new();
452    let mut previous_sequences = HashSet::new();
453    let mut previous_accession_edges = HashSet::new();
454    let mut created_block_groups_set = HashSet::new();
455    let mut created_paths_set = HashSet::new();
456    let mut created_accessions_set = HashSet::new();
457    let mut created_edges_set = HashSet::new();
458    let mut created_accession_edges_set = HashSet::new();
459    let mut created_nodes_set = HashSet::new();
460    let mut created_sequences_set = HashSet::new();
461    let mut created_samples_set: HashSet<String> = HashSet::new();
462    let mut created_collections_set: HashSet<String> = HashSet::new();
463
464    let input: &mut dyn Read = &mut changes;
465    let mut iter = ChangesetIter::start_strm(&input).unwrap();
466
467    while let Some(item) = iter.next().unwrap() {
468        let op = item.op().unwrap();
469        // info on indirect changes: https://www.sqlite.org/draft/session/sqlite3session_indirect.html
470        if !op.indirect() {
471            let table = op.table_name();
472            let pk_column = item
473                .pk()
474                .unwrap()
475                .iter()
476                .find_position(|item| **item == 1)
477                .unwrap()
478                .0;
479            match table {
480                "sequences" => {
481                    let hash = parse_hashid(item, pk_column);
482                    let sequence = Sequence::new()
483                        .sequence_type(&parse_string(item, 1))
484                        .sequence(&parse_string(item, 2))
485                        .name(&parse_string(item, 3))
486                        .file_path(&parse_string(item, 4))
487                        .length(parse_number(item, 5))
488                        .build();
489                    assert_eq!(hash, sequence.hash);
490                    created_sequences.push(sequence);
491                    created_sequences_set.insert(hash);
492                }
493                "block_groups" => {
494                    let bg_pk = HashId(parse_blob(item, pk_column));
495                    let collection = parse_string(item, 1);
496                    let sample_name = parse_maybe_string(item, 2);
497                    let name = parse_string(item, 3);
498                    let created_on = parse_number(item, 4);
499
500                    created_block_groups.push(BlockGroup {
501                        id: bg_pk,
502                        collection_name: collection.clone(),
503                        sample_name: sample_name.clone(),
504                        name,
505                        created_on,
506                    });
507
508                    created_block_groups_set.insert(bg_pk);
509                    if !created_collections_set.contains(&collection) {
510                        previous_collections.insert(collection);
511                    }
512                    if let Some(sample_name) = sample_name
513                        && !created_samples_set.contains(&sample_name)
514                    {
515                        previous_samples.insert(sample_name);
516                    }
517                }
518                "nodes" => {
519                    let node_id = parse_hashid(item, pk_column);
520                    let sequence_hash = parse_hashid(item, 1);
521
522                    created_nodes.push(Node {
523                        id: node_id,
524                        sequence_hash,
525                    });
526
527                    created_nodes_set.insert(node_id);
528                    if !created_sequences_set.contains(&sequence_hash) {
529                        previous_sequences.insert(sequence_hash);
530                    }
531                }
532                "edges" => {
533                    let edge_id = HashId(parse_blob(item, pk_column));
534                    let source_node_id = parse_hashid(item, 1);
535                    let target_node_id = parse_hashid(item, 4);
536
537                    created_edges.push(Edge {
538                        id: edge_id,
539                        source_node_id,
540                        source_coordinate: parse_number(item, 2),
541                        source_strand: Strand::column_result(item.new_value(3).unwrap()).unwrap(),
542                        target_node_id,
543                        target_coordinate: parse_number(item, 5),
544                        target_strand: Strand::column_result(item.new_value(6).unwrap()).unwrap(),
545                    });
546
547                    created_edges_set.insert(edge_id);
548                    let nodes = Node::query_by_ids(conn, &[source_node_id, target_node_id]);
549                    for node in nodes.iter() {
550                        if !created_nodes_set.contains(&node.id) && !is_terminal(node.id) {
551                            previous_sequences.insert(node.sequence_hash);
552                            previous_nodes.insert(node.id);
553                        }
554                    }
555                }
556                "block_group_edges" => {
557                    let bg_id = HashId(parse_blob(item, 1));
558                    let edge_id = HashId(parse_blob(item, 2));
559
560                    created_bg_edges.push(BlockGroupEdge {
561                        id: HashId(parse_blob(item, pk_column)),
562                        block_group_id: bg_id,
563                        edge_id,
564                        chromosome_index: parse_number(item, 3),
565                        phased: parse_number(item, 4),
566                        created_on: parse_number(item, 5),
567                    });
568
569                    if !created_edges_set.contains(&edge_id) {
570                        previous_edges.insert(edge_id);
571                    }
572                    if !created_block_groups_set.contains(&bg_id) {
573                        previous_block_groups.insert(bg_id);
574                    }
575                }
576                "samples" => {
577                    let name = parse_string(item, pk_column);
578                    created_samples.push(Sample { name: name.clone() });
579                    created_samples_set.insert(name);
580                }
581                "collections" => {
582                    let name = parse_string(item, pk_column);
583                    created_collections.push(Collection { name: name.clone() });
584                    created_collections_set.insert(name);
585                }
586                "paths" => {
587                    let path_id = HashId(parse_blob(item, pk_column));
588                    let bg_id = HashId(parse_blob(item, 1));
589
590                    created_paths.push(Path {
591                        id: path_id,
592                        block_group_id: bg_id,
593                        name: parse_string(item, 2),
594                        created_on: parse_number(item, 3),
595                    });
596
597                    created_paths_set.insert(path_id);
598                    if !created_block_groups_set.contains(&bg_id) {
599                        previous_block_groups.insert(bg_id);
600                    }
601                }
602                "path_edges" => {
603                    let path_id = HashId(parse_blob(item, 1));
604                    let edge_id = HashId(parse_blob(item, 2));
605
606                    created_path_edges.push(PathEdge {
607                        id: HashId(parse_blob(item, pk_column)),
608                        path_id,
609                        edge_id,
610                        index_in_path: parse_number(item, 3),
611                    });
612
613                    if !created_paths_set.contains(&path_id) {
614                        previous_paths.insert(path_id);
615                    }
616                    if !created_edges_set.contains(&edge_id) {
617                        previous_edges.insert(edge_id);
618                    }
619                }
620                "accessions" => {
621                    let accession_id = HashId(parse_blob(item, pk_column));
622                    let path_id = HashId(parse_blob(item, 2));
623                    let parent_accession_id = parse_maybe_hashid(item, 3);
624
625                    created_accessions.push(Accession {
626                        id: accession_id,
627                        name: parse_string(item, 1),
628                        path_id,
629                        parent_accession_id,
630                    });
631
632                    created_accessions_set.insert(accession_id);
633                    if !created_paths_set.contains(&path_id) {
634                        previous_paths.insert(path_id);
635                    }
636                    if let Some(id) = parent_accession_id
637                        && !created_accessions_set.contains(&id)
638                    {
639                        previous_accessions.insert(id);
640                    }
641                }
642                "accession_edges" => {
643                    let edge_id = parse_hashid(item, pk_column);
644                    let source_node_id = parse_hashid(item, 1);
645                    let target_node_id = parse_hashid(item, 4);
646
647                    created_accession_edges.push(AccessionEdge {
648                        id: edge_id,
649                        source_node_id,
650                        source_coordinate: parse_number(item, 2),
651                        source_strand: Strand::column_result(item.new_value(3).unwrap()).unwrap(),
652                        target_node_id,
653                        target_coordinate: parse_number(item, 5),
654                        target_strand: Strand::column_result(item.new_value(6).unwrap()).unwrap(),
655                        chromosome_index: parse_number(item, 7),
656                    });
657
658                    created_accession_edges_set.insert(edge_id);
659                    let nodes = Node::query_by_ids(conn, &[source_node_id, target_node_id]);
660                    if !created_nodes_set.contains(&source_node_id) {
661                        previous_sequences.insert(nodes[0].sequence_hash);
662                    }
663                    if source_node_id != target_node_id
664                        && !created_nodes_set.contains(&target_node_id)
665                    {
666                        previous_sequences.insert(nodes[1].sequence_hash);
667                    }
668                }
669                "accession_paths" => {
670                    let accession_id = parse_hashid(item, 1);
671                    let edge_id = parse_hashid(item, 3);
672
673                    created_accession_paths.push(AccessionPath {
674                        id: parse_hashid(item, pk_column),
675                        accession_id,
676                        index_in_path: parse_number(item, 2),
677                        edge_id,
678                    });
679
680                    if !created_accessions_set.contains(&accession_id) {
681                        previous_accessions.insert(accession_id);
682                    }
683                    if !created_accession_edges_set.contains(&edge_id) {
684                        previous_accession_edges.insert(edge_id);
685                    }
686                }
687                "annotations" => {
688                    let id = parse_hashid(item, pk_column);
689                    let name = parse_string(item, 1);
690                    let group = parse_string(item, 2);
691                    let accession_id = parse_hashid(item, 3);
692
693                    created_annotations.push(Annotation {
694                        id,
695                        name,
696                        group,
697                        accession_id,
698                    });
699
700                    if !created_accessions_set.contains(&accession_id) {
701                        previous_accessions.insert(accession_id);
702                    }
703                }
704                "annotation_groups" => {
705                    let name = parse_string(item, pk_column);
706
707                    created_annotation_groups.push(AnnotationGroup { name });
708                }
709                "annotation_group_samples" => {
710                    let annotation_group = parse_string(item, 0);
711                    let sample_name = parse_string(item, 1);
712
713                    created_annotation_group_samples.push(AnnotationGroupSample {
714                        annotation_group,
715                        sample_name: sample_name.clone(),
716                    });
717
718                    if !created_samples_set.contains(&sample_name) {
719                        previous_samples.insert(sample_name);
720                    }
721                }
722                t => {
723                    println!("unhandled table {t}")
724                }
725            }
726        }
727    }
728
729    // Process additional dependencies for edges
730    let existing_edges = Edge::query_by_ids(
731        conn,
732        &previous_edges.clone().into_iter().collect::<Vec<_>>(),
733    );
734    let mut new_nodes = vec![];
735    for edge in existing_edges.iter() {
736        if !previous_nodes.contains(&edge.source_node_id) && !is_terminal(edge.source_node_id) {
737            previous_nodes.insert(edge.source_node_id);
738            new_nodes.push(edge.source_node_id);
739        }
740        if !previous_nodes.contains(&edge.target_node_id) && !is_terminal(edge.target_node_id) {
741            previous_nodes.insert(edge.target_node_id);
742            new_nodes.push(edge.target_node_id);
743        }
744    }
745    for node in Node::query_by_ids(conn, &new_nodes) {
746        previous_sequences.insert(node.sequence_hash);
747    }
748
749    let changeset_models = ChangesetModels {
750        sequences: created_sequences,
751        block_groups: created_block_groups,
752        nodes: created_nodes,
753        edges: created_edges,
754        block_group_edges: created_bg_edges,
755        samples: created_samples,
756        collections: created_collections,
757        paths: created_paths,
758        path_edges: created_path_edges,
759        accessions: created_accessions,
760        accession_edges: created_accession_edges,
761        accession_paths: created_accession_paths,
762        annotation_groups: created_annotation_groups,
763        annotations: created_annotations,
764        annotation_group_samples: created_annotation_group_samples,
765    };
766
767    let dependency_models = DependencyModels {
768        collections: Collection::query_by_ids(conn, &previous_collections),
769        samples: Sample::query_by_ids(conn, &previous_samples),
770        sequences: Sequence::query_by_ids(conn, &previous_sequences),
771        block_group: BlockGroup::query_by_ids(conn, &previous_block_groups),
772        nodes: Node::query_by_ids(conn, &previous_nodes),
773        edges: Edge::query_by_ids(conn, &previous_edges),
774        paths: Path::query_by_ids(conn, &previous_paths),
775        accessions: Accession::query_by_ids(conn, &previous_accessions),
776        accession_edges: AccessionEdge::query_by_ids(conn, &previous_accession_edges),
777    };
778
779    (changeset_models, dependency_models)
780}
781
782pub fn apply_changeset(
783    conn: &GraphConnection,
784    changeset: &ChangesetModels,
785    dependencies: &DependencyModels,
786) -> Result<(), ChangesetError> {
787    for collection in dependencies.collections.iter() {
788        Collection::create(conn, &collection.name);
789    }
790
791    for sample in dependencies.samples.iter() {
792        Sample::get_or_create(conn, &sample.name);
793    }
794
795    for sequence in dependencies.sequences.iter() {
796        NewSequence::from(sequence).save(conn);
797    }
798    for node in dependencies.nodes.iter() {
799        if !is_terminal(node.id) {
800            assert!(Sequence::get_by_id(conn, &node.sequence_hash).is_some());
801        }
802    }
803
804    for bg in dependencies.block_group.iter() {
805        let sample_name = bg.sample_name.as_ref().map(|v| v as &str);
806        BlockGroup::create(conn, &bg.collection_name, sample_name, &bg.name);
807    }
808
809    for node in dependencies.nodes.iter() {
810        Node::create(conn, &node.sequence_hash, &node.id);
811    }
812
813    Edge::bulk_create(
814        conn,
815        &dependencies
816            .edges
817            .iter()
818            .map(EdgeData::from)
819            .collect::<Vec<_>>(),
820    );
821
822    for path in dependencies.paths.iter() {
823        Path::create(conn, &path.name, &path.block_group_id, &[]);
824    }
825
826    AccessionEdge::bulk_create(
827        conn,
828        &dependencies
829            .accession_edges
830            .iter()
831            .map(AccessionEdgeData::from)
832            .collect::<Vec<AccessionEdgeData>>(),
833    );
834
835    for accession in dependencies.accessions.iter() {
836        Accession::get_or_create(
837            conn,
838            &accession.name,
839            &accession.path_id,
840            accession.parent_accession_id.as_ref(),
841        );
842    }
843
844    for collection in &changeset.collections {
845        Collection::create(conn, &collection.name);
846    }
847    for sample in &changeset.samples {
848        Sample::get_or_create(conn, &sample.name);
849    }
850    for sequence in &changeset.sequences {
851        NewSequence::from(sequence).save(conn);
852    }
853    for bg in &changeset.block_groups {
854        BlockGroup::create(
855            conn,
856            &bg.collection_name,
857            bg.sample_name.as_deref(),
858            &bg.name,
859        );
860    }
861
862    for node in &changeset.nodes {
863        Node::create(conn, &node.sequence_hash, &node.id);
864    }
865
866    Edge::bulk_create(
867        conn,
868        &changeset
869            .edges
870            .iter()
871            .map(EdgeData::from)
872            .collect::<Vec<_>>(),
873    );
874    BlockGroupEdge::bulk_create(
875        conn,
876        &changeset
877            .block_group_edges
878            .iter()
879            .map(BlockGroupEdgeData::from)
880            .collect::<Vec<BlockGroupEdgeData>>(),
881    );
882
883    for path in &changeset.paths {
884        Path::create(conn, &path.name, &path.block_group_id, &[]);
885        let edges = changeset
886            .path_edges
887            .iter()
888            .filter(|edge| edge.path_id == path.id)
889            .sorted_by(|e1, e2| Ord::cmp(&e1.index_in_path, &e2.index_in_path))
890            .map(|path_edge| path_edge.edge_id);
891        PathEdge::bulk_create(conn, &path.id, &edges.collect::<Vec<_>>());
892    }
893
894    AccessionEdge::bulk_create(
895        conn,
896        &changeset
897            .accession_edges
898            .iter()
899            .map(AccessionEdgeData::from)
900            .collect::<Vec<_>>(),
901    );
902
903    for accession in &changeset.accessions {
904        Accession::get_or_create(
905            conn,
906            &accession.name,
907            &accession.path_id,
908            accession.parent_accession_id.as_ref(),
909        );
910        let edges = changeset
911            .accession_paths
912            .iter()
913            .filter(|ap| ap.accession_id == accession.id)
914            .sorted_by(|e1, e2| Ord::cmp(&e1.index_in_path, &e2.index_in_path))
915            .map(|ap| ap.edge_id);
916        AccessionPath::create(conn, &accession.id, &edges.collect::<Vec<_>>());
917    }
918
919    for annotation_group in &changeset.annotation_groups {
920        AnnotationGroup::get_or_create(conn, &annotation_group.name).map_err(|err| match err {
921            AnnotationGroupError::DatabaseError(inner) => inner,
922        })?;
923    }
924
925    for annotation in &changeset.annotations {
926        Annotation::get_or_create(
927            conn,
928            &annotation.name,
929            &annotation.group,
930            &annotation.accession_id,
931        )
932        .map_err(|err| match err {
933            AnnotationError::DatabaseError(inner) => inner,
934            AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
935                inner
936            }
937        })?;
938    }
939
940    for annotation_group_sample in &changeset.annotation_group_samples {
941        AnnotationGroupSample::create(
942            conn,
943            &annotation_group_sample.annotation_group,
944            &annotation_group_sample.sample_name,
945        )
946        .map_err(|err| match err {
947            AnnotationError::DatabaseError(inner) => inner,
948            AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
949                inner
950            }
951        })?;
952    }
953    Ok(())
954}
955
956pub fn revert_changeset(
957    conn: &GraphConnection,
958    changeset: &ChangesetModels,
959) -> Result<(), ChangesetError> {
960    for annotation_group_sample in &changeset.annotation_group_samples {
961        AnnotationGroupSample::delete(
962            conn,
963            &annotation_group_sample.annotation_group,
964            &annotation_group_sample.sample_name,
965        )
966        .map_err(|err| match err {
967            AnnotationError::DatabaseError(inner) => inner,
968            AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
969                inner
970            }
971        })?;
972    }
973    Annotation::delete_by_ids(
974        conn,
975        &changeset
976            .annotations
977            .iter()
978            .map(|obj| obj.id)
979            .collect::<Vec<_>>(),
980    );
981    AnnotationGroup::delete_by_ids(
982        conn,
983        &changeset
984            .annotation_groups
985            .iter()
986            .map(|group| group.name.clone())
987            .collect::<Vec<_>>(),
988    );
989    AccessionPath::delete_by_ids(
990        conn,
991        &changeset
992            .accession_paths
993            .iter()
994            .map(|obj| obj.id)
995            .collect::<Vec<_>>(),
996    );
997    // TODO: edges can be made by other operations in other dbs earlier and reused, so we likely want to allow FK failures delete for deletions
998    AccessionEdge::delete_by_ids(
999        conn,
1000        &changeset
1001            .accession_edges
1002            .iter()
1003            .map(|pe| pe.id)
1004            .collect::<Vec<_>>(),
1005    );
1006    Accession::delete_by_ids(
1007        conn,
1008        &changeset
1009            .accessions
1010            .iter()
1011            .map(|obj| obj.id)
1012            .collect::<Vec<_>>(),
1013    );
1014    PathEdge::delete_by_ids(
1015        conn,
1016        &changeset
1017            .path_edges
1018            .iter()
1019            .map(|pe| pe.id)
1020            .collect::<Vec<_>>(),
1021    );
1022    Path::delete_by_ids(
1023        conn,
1024        &changeset.paths.iter().map(|p| p.id).collect::<Vec<_>>(),
1025    );
1026
1027    BlockGroupEdge::delete_by_ids(
1028        conn,
1029        &changeset
1030            .block_group_edges
1031            .iter()
1032            .map(|obj| obj.id)
1033            .collect::<Vec<_>>(),
1034    );
1035    Edge::delete_by_ids(
1036        conn,
1037        &changeset.edges.iter().map(|obj| obj.id).collect::<Vec<_>>(),
1038    );
1039
1040    BlockGroup::delete_by_ids(
1041        conn,
1042        &changeset
1043            .block_groups
1044            .iter()
1045            .map(|obj| obj.id)
1046            .collect::<Vec<_>>(),
1047    );
1048
1049    Node::delete_by_ids(
1050        conn,
1051        &changeset.nodes.iter().map(|obj| obj.id).collect::<Vec<_>>(),
1052    );
1053    Collection::delete_by_ids(
1054        conn,
1055        &changeset
1056            .collections
1057            .iter()
1058            .map(|obj| obj.name.clone())
1059            .collect::<Vec<_>>(),
1060    );
1061    Sample::delete_by_ids(
1062        conn,
1063        &changeset
1064            .samples
1065            .iter()
1066            .map(|obj| obj.name.clone())
1067            .collect::<Vec<_>>(),
1068    );
1069    Sequence::delete_by_ids(
1070        conn,
1071        &changeset
1072            .sequences
1073            .iter()
1074            .map(|obj| obj.hash)
1075            .collect::<Vec<_>>(),
1076    );
1077    Ok(())
1078}
1079
1080pub fn get_changeset_from_path(path: PathBuf) -> DatabaseChangeset {
1081    use capnp::serialize_packed;
1082    let file = fs::File::open(path).unwrap();
1083    let mut reader = std::io::BufReader::new(file);
1084
1085    let message_reader =
1086        serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new()).unwrap();
1087    let root = message_reader
1088        .get_root::<database_changeset::Reader>()
1089        .unwrap();
1090    DatabaseChangeset::read_capnp(root)
1091}
1092
1093pub fn get_changeset_dependencies_from_path(path: PathBuf) -> DependencyModels {
1094    use capnp::serialize_packed;
1095
1096    let file = fs::File::open(path).unwrap();
1097    let mut reader = std::io::BufReader::new(file);
1098    let message_reader =
1099        serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new()).unwrap();
1100    let root = message_reader
1101        .get_root::<crate::gen_models_capnp::dependency_models::Reader>()
1102        .unwrap();
1103    DependencyModels::read_capnp(root)
1104}
1105
1106pub fn write_changeset(
1107    workspace: &Workspace,
1108    operation: &Operation,
1109    changes: DatabaseChangeset,
1110    dependencies: &DependencyModels,
1111) {
1112    use capnp::{message::Builder, serialize_packed};
1113
1114    let change_path = operation.get_changeset_path(workspace);
1115    let dependency_path = operation.get_changeset_dependencies_path(workspace);
1116
1117    // Write dependencies using capnp
1118    let mut dependency_file = fs::File::create_new(&dependency_path)
1119        .unwrap_or_else(|_| panic!("Unable to open {dependency_path:?}"));
1120    let mut message = Builder::new_default();
1121    let mut dep_root = message.init_root();
1122    dependencies.write_capnp(&mut dep_root);
1123    serialize_packed::write_message(&mut dependency_file, &message).unwrap();
1124
1125    // Write changes using capnp
1126    let mut file = fs::File::create_new(&change_path)
1127        .unwrap_or_else(|_| panic!("Unable to open {change_path:?}"));
1128    let mut message = Builder::new_default();
1129    let mut change_root = message.init_root();
1130    changes.write_capnp(&mut change_root);
1131    serialize_packed::write_message(&mut file, &message).unwrap();
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136    use chrono::Utc;
1137
1138    use super::*;
1139    use crate::{
1140        file_types::FileTypes,
1141        operations::{OperationFile, OperationInfo},
1142        session_operations::{end_operation, start_operation},
1143        test_helpers::{setup_block_group, setup_gen},
1144        traits::Query,
1145    };
1146
1147    #[test]
1148    fn test_database_changeset_capnp_serialization() {
1149        use capnp::message::TypedBuilder;
1150        use gen_core::Strand;
1151
1152        let changeset_models = ChangesetModels {
1153            collections: vec![crate::collection::Collection {
1154                name: "test_collection".to_string(),
1155            }],
1156            samples: vec![crate::sample::Sample {
1157                name: "test_sample".to_string(),
1158            }],
1159            sequences: vec![
1160                NewSequence::new()
1161                    .sequence("ATCG")
1162                    .sequence_type("DNA")
1163                    .name("test_seq")
1164                    .build(),
1165            ],
1166            block_groups: vec![BlockGroup {
1167                id: HashId::pad_str(1),
1168                collection_name: "test_collection".to_string(),
1169                sample_name: Some("test_sample".to_string()),
1170                name: "test_bg".to_string(),
1171                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1172            }],
1173            nodes: vec![
1174                Node {
1175                    id: HashId::convert_str("node_hash"),
1176                    sequence_hash: HashId::convert_str("test_hash"),
1177                },
1178                Node {
1179                    id: HashId::convert_str("node_hash_2"),
1180                    sequence_hash: HashId::convert_str("test_hash_2"),
1181                },
1182            ],
1183            edges: vec![Edge {
1184                id: HashId::pad_str(1),
1185                source_node_id: HashId::convert_str("1"),
1186                source_coordinate: 0,
1187                source_strand: Strand::Forward,
1188                target_node_id: HashId::convert_str("2"),
1189                target_coordinate: 0,
1190                target_strand: Strand::Forward,
1191            }],
1192            block_group_edges: vec![crate::block_group_edge::BlockGroupEdge {
1193                id: HashId::pad_str(1),
1194                block_group_id: HashId::pad_str(1),
1195                edge_id: HashId::pad_str(1),
1196                chromosome_index: 0,
1197                phased: 0,
1198                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1199            }],
1200            paths: vec![Path {
1201                id: HashId::pad_str(1),
1202                block_group_id: HashId::pad_str(1),
1203                name: "test_path".to_string(),
1204                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1205            }],
1206            path_edges: vec![PathEdge {
1207                id: HashId::pad_str(1),
1208                path_id: HashId::pad_str(1),
1209                index_in_path: 0,
1210                edge_id: HashId::pad_str(1),
1211            }],
1212            accessions: vec![Accession {
1213                id: HashId::pad_str(1),
1214                name: "test_accession".to_string(),
1215                path_id: HashId::pad_str(1),
1216                parent_accession_id: None,
1217            }],
1218            accession_edges: vec![AccessionEdge {
1219                id: HashId::pad_str(1),
1220                source_node_id: HashId::convert_str("1"),
1221                source_coordinate: 0,
1222                source_strand: Strand::Forward,
1223                target_node_id: HashId::convert_str("2"),
1224                target_coordinate: 0,
1225                target_strand: Strand::Forward,
1226                chromosome_index: 0,
1227            }],
1228            accession_paths: vec![AccessionPath {
1229                id: HashId::pad_str(1),
1230                accession_id: HashId::pad_str(1),
1231                index_in_path: 0,
1232                edge_id: HashId::pad_str(1),
1233            }],
1234            annotation_groups: vec![AnnotationGroup {
1235                name: "gff3".to_string(),
1236            }],
1237            annotations: vec![Annotation {
1238                id: HashId::pad_str(1),
1239                name: "test_annotation".to_string(),
1240                group: "gff3".to_string(),
1241                accession_id: HashId::pad_str(1),
1242            }],
1243            annotation_group_samples: vec![AnnotationGroupSample {
1244                annotation_group: "gff3".to_string(),
1245                sample_name: "test_sample".to_string(),
1246            }],
1247        };
1248
1249        let changeset = DatabaseChangeset {
1250            db_path: "/path/to/db".to_string(),
1251            changes: changeset_models,
1252        };
1253
1254        let mut message = TypedBuilder::<database_changeset::Owned>::new_default();
1255        let mut root = message.init_root();
1256        changeset.write_capnp(&mut root);
1257
1258        let deserialized = DatabaseChangeset::read_capnp(root.into_reader());
1259        assert_eq!(changeset, deserialized);
1260    }
1261
1262    #[test]
1263    fn test_database_changeset_get_db_path() {
1264        use std::io::Write;
1265
1266        use capnp::{message::Builder, serialize_packed};
1267        use tempfile::NamedTempFile;
1268
1269        let expected_db_path = "/tmp/test_db_path";
1270        let changeset = DatabaseChangeset {
1271            db_path: expected_db_path.to_string(),
1272            changes: ChangesetModels {
1273                collections: vec![],
1274                samples: vec![],
1275                sequences: vec![],
1276                block_groups: vec![],
1277                nodes: vec![],
1278                edges: vec![],
1279                block_group_edges: vec![],
1280                paths: vec![],
1281                path_edges: vec![],
1282                accessions: vec![],
1283                accession_edges: vec![],
1284                accession_paths: vec![],
1285                annotation_groups: vec![],
1286                annotations: vec![],
1287                annotation_group_samples: vec![],
1288            },
1289        };
1290
1291        let mut temp_file = NamedTempFile::new().unwrap();
1292        let path = temp_file.path().to_path_buf();
1293
1294        let mut message = Builder::new_default();
1295        let mut root = message.init_root();
1296        changeset.write_capnp(&mut root);
1297        serialize_packed::write_message(&mut temp_file, &message).unwrap();
1298        temp_file.flush().unwrap();
1299
1300        let db_path = DatabaseChangeset::get_db_path(path.as_path());
1301
1302        assert_eq!(db_path, expected_db_path);
1303    }
1304
1305    #[test]
1306    fn test_changeset_models_capnp_serialization() {
1307        use capnp::message::TypedBuilder;
1308        use gen_core::Strand;
1309
1310        let changeset_models = ChangesetModels {
1311            collections: vec![crate::collection::Collection {
1312                name: "test_collection".to_string(),
1313            }],
1314            samples: vec![crate::sample::Sample {
1315                name: "test_sample".to_string(),
1316            }],
1317            sequences: vec![
1318                NewSequence::new()
1319                    .sequence("ATCG")
1320                    .sequence_type("DNA")
1321                    .name("test_seq")
1322                    .build(),
1323            ],
1324            block_groups: vec![BlockGroup {
1325                id: HashId::pad_str(1),
1326                collection_name: "test_collection".to_string(),
1327                sample_name: Some("test_sample".to_string()),
1328                name: "test_bg".to_string(),
1329                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1330            }],
1331            nodes: vec![
1332                Node {
1333                    id: HashId::convert_str("node_hash"),
1334                    sequence_hash: HashId::convert_str("test_hash"),
1335                },
1336                Node {
1337                    id: HashId::convert_str("node_hash_2"),
1338                    sequence_hash: HashId::convert_str("test_hash_2"),
1339                },
1340            ],
1341            edges: vec![Edge {
1342                id: HashId::pad_str(1),
1343                source_node_id: HashId::convert_str("1"),
1344                source_coordinate: 0,
1345                source_strand: Strand::Forward,
1346                target_node_id: HashId::convert_str("2"),
1347                target_coordinate: 0,
1348                target_strand: Strand::Forward,
1349            }],
1350            block_group_edges: vec![crate::block_group_edge::BlockGroupEdge {
1351                id: HashId::pad_str(1),
1352                block_group_id: HashId::pad_str(1),
1353                edge_id: HashId::pad_str(1),
1354                chromosome_index: 0,
1355                phased: 0,
1356                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1357            }],
1358            paths: vec![Path {
1359                id: HashId::pad_str(1),
1360                block_group_id: HashId::pad_str(1),
1361                name: "test_path".to_string(),
1362                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1363            }],
1364            path_edges: vec![PathEdge {
1365                id: HashId::pad_str(1),
1366                path_id: HashId::pad_str(1),
1367                index_in_path: 0,
1368                edge_id: HashId::pad_str(1),
1369            }],
1370            accessions: vec![Accession {
1371                id: HashId::pad_str(1),
1372                name: "test_accession".to_string(),
1373                path_id: HashId::pad_str(1),
1374                parent_accession_id: None,
1375            }],
1376            accession_edges: vec![AccessionEdge {
1377                id: HashId::pad_str(1),
1378                source_node_id: HashId::convert_str("1"),
1379                source_coordinate: 0,
1380                source_strand: Strand::Forward,
1381                target_node_id: HashId::convert_str("2"),
1382                target_coordinate: 0,
1383                target_strand: Strand::Forward,
1384                chromosome_index: 0,
1385            }],
1386            accession_paths: vec![AccessionPath {
1387                id: HashId::pad_str(1),
1388                accession_id: HashId::pad_str(1),
1389                index_in_path: 0,
1390                edge_id: HashId::pad_str(1),
1391            }],
1392            annotation_groups: vec![AnnotationGroup {
1393                name: "gff3".to_string(),
1394            }],
1395            annotations: vec![Annotation {
1396                id: HashId::pad_str(1),
1397                name: "test_annotation".to_string(),
1398                group: "gff3".to_string(),
1399                accession_id: HashId::pad_str(1),
1400            }],
1401            annotation_group_samples: vec![AnnotationGroupSample {
1402                annotation_group: "gff3".to_string(),
1403                sample_name: "test_sample".to_string(),
1404            }],
1405        };
1406
1407        let mut message = TypedBuilder::<changeset_models::Owned>::new_default();
1408        let mut root = message.init_root();
1409        changeset_models.write_capnp(&mut root);
1410
1411        let deserialized = ChangesetModels::read_capnp(root.into_reader());
1412
1413        assert_eq!(changeset_models, deserialized);
1414    }
1415
1416    #[test]
1417    fn test_changeset_includes_annotations() {
1418        use crate::block_group::PathCache;
1419
1420        let context = setup_gen();
1421        let conn = context.graph().conn();
1422        let op_conn = context.operations().conn();
1423
1424        let db_uuid = crate::metadata::get_db_uuid(conn);
1425        crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path").unwrap();
1426
1427        let _ = Sample::create(conn, "sample-1").unwrap();
1428        let (block_group_id, path) = setup_block_group(conn);
1429        let mut cache = PathCache::new(conn);
1430        let _ = PathCache::lookup(&mut cache, &block_group_id, path.name.clone());
1431        let accession = BlockGroup::add_accession(conn, &path, "ann-accession", 0, 5, &mut cache);
1432
1433        let mut session = start_operation(conn);
1434        let annotation = Annotation::get_or_create(conn, "gene-a", "gff3", &accession.id).unwrap();
1435        annotation.add_samples(conn, &["sample-1"]).unwrap();
1436
1437        let operation = end_operation(
1438            &context,
1439            &mut session,
1440            &OperationInfo {
1441                files: vec![],
1442                description: "annotation op".to_string(),
1443            },
1444            "annotation op",
1445            None,
1446        )
1447        .unwrap();
1448
1449        let changeset = operation.get_changeset(context.workspace());
1450        assert_eq!(
1451            changeset.changes.annotation_groups,
1452            vec![AnnotationGroup {
1453                name: "gff3".to_string(),
1454            }]
1455        );
1456        assert_eq!(changeset.changes.annotations, vec![annotation.clone()]);
1457        assert_eq!(
1458            changeset.changes.annotation_group_samples,
1459            vec![AnnotationGroupSample {
1460                annotation_group: annotation.group.clone(),
1461                sample_name: "sample-1".to_string(),
1462            }]
1463        );
1464
1465        let dependencies = operation.get_changeset_dependencies(context.workspace());
1466        assert_eq!(dependencies.samples.len(), 1);
1467        assert_eq!(dependencies.samples[0].name, "sample-1");
1468        assert_eq!(dependencies.accessions.len(), 1);
1469        assert_eq!(dependencies.accessions[0].id, accession.id);
1470    }
1471
1472    #[cfg(test)]
1473    mod changeset_dependencies {
1474        use super::*;
1475
1476        #[test]
1477        fn test_tracks_nodes_and_sequences_from_previous_block_group_edges() {
1478            let context = setup_gen();
1479            let conn = context.graph().conn();
1480            let op_conn = context.operations().conn();
1481
1482            let db_uuid = crate::metadata::get_db_uuid(conn);
1483            crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path")
1484                .unwrap();
1485
1486            let (bg_id, _path_id) = setup_block_group(conn);
1487            let old_edges = BlockGroupEdge::edges_for_block_group(conn, &bg_id);
1488
1489            let mut session = start_operation(conn);
1490            // make a blockgroup with an edge from our parent blockgroup
1491            let _ = Sample::create(conn, "new").unwrap();
1492            let new_bg = BlockGroup::create(conn, "test", Some("new"), "new-bg");
1493            let shared_edge = old_edges[0].edge.clone();
1494            BlockGroupEdge::bulk_create(
1495                conn,
1496                &[BlockGroupEdgeData {
1497                    block_group_id: new_bg.id,
1498                    edge_id: shared_edge.id,
1499                    chromosome_index: 0,
1500                    phased: 0,
1501                }],
1502            );
1503            let operation = end_operation(
1504                &context,
1505                &mut session,
1506                &OperationInfo {
1507                    files: vec![],
1508                    description: "test".to_string(),
1509                },
1510                "test",
1511                None,
1512            )
1513            .unwrap();
1514
1515            let dependencies = operation.get_changeset_dependencies(context.workspace());
1516            assert_eq!(dependencies.nodes[0].id, shared_edge.target_node_id);
1517            assert_eq!(dependencies.nodes.len(), 1);
1518            let nodes = Node::query_by_ids(conn, &[shared_edge.target_node_id]);
1519            assert_eq!(dependencies.sequences[0].hash, nodes[0].sequence_hash);
1520            assert_eq!(dependencies.sequences.len(), 1);
1521        }
1522
1523        #[test]
1524        fn test_records_patch_dependencies() {
1525            let context = setup_gen();
1526            let conn = context.graph().conn();
1527            let op_conn = context.operations().conn();
1528
1529            let db_uuid = crate::metadata::get_db_uuid(conn);
1530            crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path")
1531                .unwrap();
1532
1533            // create some stuff before we attach to our main session that will be required as extra information
1534            let (bg_id, _path_id) = setup_block_group(conn);
1535            let dep_bg = BlockGroup::get_by_id(conn, &bg_id);
1536
1537            let existing_seq = Sequence::new()
1538                .sequence_type("DNA")
1539                .sequence("AAAATTTT")
1540                .save(conn);
1541            let existing_node_id =
1542                Node::create(conn, &existing_seq.hash, &HashId::convert_str("1"));
1543
1544            let mut session = start_operation(conn);
1545
1546            let random_seq = Sequence::new()
1547                .sequence_type("DNA")
1548                .sequence("ATCG")
1549                .save(conn);
1550            let random_node_id = Node::create(conn, &random_seq.hash, &HashId::convert_str("2"));
1551
1552            let new_edge = Edge::create(
1553                conn,
1554                random_node_id,
1555                0,
1556                Strand::Forward,
1557                existing_node_id,
1558                0,
1559                Strand::Forward,
1560            );
1561            let block_group_edge = BlockGroupEdgeData {
1562                block_group_id: bg_id,
1563                edge_id: new_edge.id,
1564                chromosome_index: 0,
1565                phased: 0,
1566            };
1567            BlockGroupEdge::bulk_create(conn, &[block_group_edge]);
1568            let operation = end_operation(
1569                &context,
1570                &mut session,
1571                &OperationInfo {
1572                    files: vec![OperationFile {
1573                        file_path: "test".to_string(),
1574                        file_type: FileTypes::None,
1575                    }],
1576                    description: "test".to_string(),
1577                },
1578                "test",
1579                None,
1580            )
1581            .unwrap();
1582
1583            let dependencies = operation.get_changeset_dependencies(context.workspace());
1584            assert_eq!(dependencies.sequences.len(), 1);
1585            assert_eq!(
1586                dependencies.block_group[0].collection_name,
1587                dep_bg.collection_name
1588            );
1589            assert_eq!(dependencies.block_group[0].name, dep_bg.name);
1590            assert_eq!(dependencies.block_group[0].sample_name, dep_bg.sample_name);
1591        }
1592    }
1593}