Skip to main content

gen_models/
changesets.rs

1use std::{
2    collections::{HashMap, HashSet},
3    convert::TryInto,
4    fs,
5    io::Read,
6    path::{Path as StdPath, PathBuf},
7    str,
8};
9
10use gen_core::{HashId, Strand, config::Workspace, is_terminal, traits::Capnp};
11use itertools::Itertools;
12use rusqlite::{
13    session::{ChangesetItem, ChangesetIter},
14    types::FromSql,
15};
16use serde::{Deserialize, Serialize};
17
18use crate::{
19    accession::{Accession, AccessionEdge, AccessionEdgeData, AccessionPath},
20    annotations::{
21        Annotation, AnnotationError, AnnotationGroup, AnnotationGroupError, AnnotationGroupSample,
22    },
23    block_group::{BlockGroup, NewBlockGroup},
24    block_group_edge::{BlockGroupEdge, BlockGroupEdgeData},
25    collection::Collection,
26    db::GraphConnection,
27    edge::{Edge, EdgeData},
28    errors::ChangesetError,
29    gen_models_capnp::{changeset_models, database_changeset},
30    node::Node,
31    operations::Operation,
32    path::Path,
33    path_edge::PathEdge,
34    sample::Sample,
35    sample_lineage::SampleLineage,
36    sequence::{NewSequence, Sequence},
37    session_operations::DependencyModels,
38    traits::Query,
39};
40
41#[derive(Debug, Deserialize, Serialize, PartialEq)]
42pub struct DatabaseChangeset {
43    pub db_path: String,
44    pub changes: ChangesetModels,
45}
46
47impl DatabaseChangeset {
48    pub fn get_db_path(path: &StdPath) -> String {
49        use capnp::serialize_packed;
50
51        let file = fs::File::open(path).unwrap();
52        let mut reader = std::io::BufReader::new(file);
53
54        let message_reader =
55            serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new())
56                .unwrap();
57        let root = message_reader
58            .get_root::<database_changeset::Reader>()
59            .unwrap();
60        root.get_db_path().unwrap().to_string().unwrap()
61    }
62}
63
64impl<'a> Capnp<'a> for DatabaseChangeset {
65    type Builder = database_changeset::Builder<'a>;
66    type Reader = database_changeset::Reader<'a>;
67
68    fn write_capnp(&self, builder: &mut Self::Builder) {
69        builder.set_db_path(&self.db_path);
70        let mut changeset_builder = builder.reborrow().init_changes();
71        self.changes.write_capnp(&mut changeset_builder);
72    }
73
74    fn read_capnp(reader: Self::Reader) -> Self {
75        let db_path = reader.get_db_path().unwrap().to_string().unwrap();
76        let changeset_reader = reader.get_changes().unwrap();
77        let changes = ChangesetModels::read_capnp(changeset_reader);
78        DatabaseChangeset { db_path, changes }
79    }
80}
81
82#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
83pub struct ChangesetModels {
84    pub collections: Vec<crate::collection::Collection>,
85    pub samples: Vec<crate::sample::Sample>,
86    pub sample_lineages: Vec<SampleLineage>,
87    pub sequences: Vec<Sequence>,
88    pub block_groups: Vec<BlockGroup>,
89    pub nodes: Vec<Node>,
90    pub edges: Vec<Edge>,
91    pub block_group_edges: Vec<crate::block_group_edge::BlockGroupEdge>,
92    pub paths: Vec<Path>,
93    pub path_edges: Vec<PathEdge>,
94    pub accessions: Vec<Accession>,
95    pub accession_edges: Vec<AccessionEdge>,
96    pub accession_paths: Vec<AccessionPath>,
97    pub annotation_groups: Vec<AnnotationGroup>,
98    pub annotations: Vec<Annotation>,
99    pub annotation_group_samples: Vec<AnnotationGroupSample>,
100}
101
102impl<'a> Capnp<'a> for ChangesetModels {
103    type Builder = changeset_models::Builder<'a>;
104    type Reader = changeset_models::Reader<'a>;
105
106    fn write_capnp(&self, builder: &mut Self::Builder) {
107        // Write collections
108        let mut collections_builder = builder
109            .reborrow()
110            .init_collections(self.collections.len() as u32);
111        for (i, collection) in self.collections.iter().enumerate() {
112            let mut collection_builder = collections_builder.reborrow().get(i as u32);
113            collection.write_capnp(&mut collection_builder);
114        }
115
116        // Write samples
117        let mut samples_builder = builder.reborrow().init_samples(self.samples.len() as u32);
118        for (i, sample) in self.samples.iter().enumerate() {
119            let mut sample_builder = samples_builder.reborrow().get(i as u32);
120            sample.write_capnp(&mut sample_builder);
121        }
122
123        let mut sample_lineages_builder = builder
124            .reborrow()
125            .init_sample_lineages(self.sample_lineages.len() as u32);
126        for (i, sample_lineage) in self.sample_lineages.iter().enumerate() {
127            let mut sample_lineage_builder = sample_lineages_builder.reborrow().get(i as u32);
128            sample_lineage.write_capnp(&mut sample_lineage_builder);
129        }
130
131        // Write sequences
132        let mut sequences_builder = builder
133            .reborrow()
134            .init_sequences(self.sequences.len() as u32);
135        for (i, sequence) in self.sequences.iter().enumerate() {
136            let mut sequence_builder = sequences_builder.reborrow().get(i as u32);
137            sequence.write_capnp(&mut sequence_builder);
138        }
139
140        // Write block groups
141        let mut block_groups_builder = builder
142            .reborrow()
143            .init_block_groups(self.block_groups.len() as u32);
144        for (i, block_group) in self.block_groups.iter().enumerate() {
145            let mut block_group_builder = block_groups_builder.reborrow().get(i as u32);
146            block_group.write_capnp(&mut block_group_builder);
147        }
148
149        // Write nodes
150        let mut nodes_builder = builder.reborrow().init_nodes(self.nodes.len() as u32);
151        for (i, node) in self.nodes.iter().enumerate() {
152            let mut node_builder = nodes_builder.reborrow().get(i as u32);
153            node.write_capnp(&mut node_builder);
154        }
155
156        // Write edges
157        let mut edges_builder = builder.reborrow().init_edges(self.edges.len() as u32);
158        for (i, edge) in self.edges.iter().enumerate() {
159            let mut edge_builder = edges_builder.reborrow().get(i as u32);
160            edge.write_capnp(&mut edge_builder);
161        }
162
163        // Write block group edges
164        let mut block_group_edges_builder = builder
165            .reborrow()
166            .init_block_group_edges(self.block_group_edges.len() as u32);
167        for (i, block_group_edge) in self.block_group_edges.iter().enumerate() {
168            let mut block_group_edge_builder = block_group_edges_builder.reborrow().get(i as u32);
169            block_group_edge.write_capnp(&mut block_group_edge_builder);
170        }
171
172        // Write paths
173        let mut paths_builder = builder.reborrow().init_paths(self.paths.len() as u32);
174        for (i, path) in self.paths.iter().enumerate() {
175            let mut path_builder = paths_builder.reborrow().get(i as u32);
176            path.write_capnp(&mut path_builder);
177        }
178
179        // Write path edges
180        let mut path_edges_builder = builder
181            .reborrow()
182            .init_path_edges(self.path_edges.len() as u32);
183        for (i, path_edge) in self.path_edges.iter().enumerate() {
184            let mut path_edge_builder = path_edges_builder.reborrow().get(i as u32);
185            path_edge.write_capnp(&mut path_edge_builder);
186        }
187
188        // Write accessions
189        let mut accessions_builder = builder
190            .reborrow()
191            .init_accessions(self.accessions.len() as u32);
192        for (i, accession) in self.accessions.iter().enumerate() {
193            let mut accession_builder = accessions_builder.reborrow().get(i as u32);
194            accession.write_capnp(&mut accession_builder);
195        }
196
197        // Write accession edges
198        let mut accession_edges_builder = builder
199            .reborrow()
200            .init_accession_edges(self.accession_edges.len() as u32);
201        for (i, accession_edge) in self.accession_edges.iter().enumerate() {
202            let mut accession_edge_builder = accession_edges_builder.reborrow().get(i as u32);
203            accession_edge.write_capnp(&mut accession_edge_builder);
204        }
205
206        // Write accession paths
207        let mut accession_paths_builder = builder
208            .reborrow()
209            .init_accession_paths(self.accession_paths.len() as u32);
210        for (i, accession_path) in self.accession_paths.iter().enumerate() {
211            let mut accession_path_builder = accession_paths_builder.reborrow().get(i as u32);
212            accession_path.write_capnp(&mut accession_path_builder);
213        }
214
215        // Write annotation groups
216        let mut annotation_groups_builder = builder
217            .reborrow()
218            .init_annotation_groups(self.annotation_groups.len() as u32);
219        for (i, annotation_group) in self.annotation_groups.iter().enumerate() {
220            let mut annotation_group_builder = annotation_groups_builder.reborrow().get(i as u32);
221            annotation_group.write_capnp(&mut annotation_group_builder);
222        }
223
224        // Write annotations
225        let mut annotations_builder = builder
226            .reborrow()
227            .init_annotations(self.annotations.len() as u32);
228        for (i, annotation) in self.annotations.iter().enumerate() {
229            let mut annotation_builder = annotations_builder.reborrow().get(i as u32);
230            annotation.write_capnp(&mut annotation_builder);
231        }
232
233        // Write annotation group samples
234        let mut annotation_group_samples_builder = builder
235            .reborrow()
236            .init_annotation_group_samples(self.annotation_group_samples.len() as u32);
237        for (i, annotation_group_sample) in self.annotation_group_samples.iter().enumerate() {
238            let mut annotation_group_sample_builder =
239                annotation_group_samples_builder.reborrow().get(i as u32);
240            annotation_group_sample.write_capnp(&mut annotation_group_sample_builder);
241        }
242    }
243
244    fn read_capnp(reader: Self::Reader) -> Self {
245        // Read collections
246        let collections_reader = reader.get_collections().unwrap();
247        let mut collections = Vec::new();
248        for collection_reader in collections_reader.iter() {
249            collections.push(crate::collection::Collection::read_capnp(collection_reader));
250        }
251
252        // Read samples
253        let samples_reader = reader.get_samples().unwrap();
254        let mut samples = Vec::new();
255        for sample_reader in samples_reader.iter() {
256            samples.push(crate::sample::Sample::read_capnp(sample_reader));
257        }
258
259        let sample_lineages_reader = reader.get_sample_lineages().unwrap();
260        let mut sample_lineages = Vec::new();
261        for sample_lineage_reader in sample_lineages_reader.iter() {
262            sample_lineages.push(SampleLineage::read_capnp(sample_lineage_reader));
263        }
264
265        // Read sequences
266        let sequences_reader = reader.get_sequences().unwrap();
267        let mut sequences = Vec::new();
268        for sequence_reader in sequences_reader.iter() {
269            sequences.push(Sequence::read_capnp(sequence_reader));
270        }
271
272        // Read block groups
273        let block_groups_reader = reader.get_block_groups().unwrap();
274        let mut block_groups = Vec::new();
275        for block_group_reader in block_groups_reader.iter() {
276            block_groups.push(BlockGroup::read_capnp(block_group_reader));
277        }
278
279        // Read nodes
280        let nodes_reader = reader.get_nodes().unwrap();
281        let mut nodes = Vec::new();
282        for node_reader in nodes_reader.iter() {
283            nodes.push(Node::read_capnp(node_reader));
284        }
285
286        // Read edges
287        let edges_reader = reader.get_edges().unwrap();
288        let mut edges = Vec::new();
289        for edge_reader in edges_reader.iter() {
290            edges.push(Edge::read_capnp(edge_reader));
291        }
292
293        // Read block group edges
294        let block_group_edges_reader = reader.get_block_group_edges().unwrap();
295        let mut block_group_edges = Vec::new();
296        for block_group_edge_reader in block_group_edges_reader.iter() {
297            block_group_edges.push(crate::block_group_edge::BlockGroupEdge::read_capnp(
298                block_group_edge_reader,
299            ));
300        }
301
302        // Read paths
303        let paths_reader = reader.get_paths().unwrap();
304        let mut paths = Vec::new();
305        for path_reader in paths_reader.iter() {
306            paths.push(Path::read_capnp(path_reader));
307        }
308
309        // Read path edges
310        let path_edges_reader = reader.get_path_edges().unwrap();
311        let mut path_edges = Vec::new();
312        for path_edge_reader in path_edges_reader.iter() {
313            path_edges.push(PathEdge::read_capnp(path_edge_reader));
314        }
315
316        // Read accessions
317        let accessions_reader = reader.get_accessions().unwrap();
318        let mut accessions = Vec::new();
319        for accession_reader in accessions_reader.iter() {
320            accessions.push(Accession::read_capnp(accession_reader));
321        }
322
323        // Read accession edges
324        let accession_edges_reader = reader.get_accession_edges().unwrap();
325        let mut accession_edges = Vec::new();
326        for accession_edge_reader in accession_edges_reader.iter() {
327            accession_edges.push(AccessionEdge::read_capnp(accession_edge_reader));
328        }
329
330        // Read accession paths
331        let accession_paths_reader = reader.get_accession_paths().unwrap();
332        let mut accession_paths = Vec::new();
333        for accession_path_reader in accession_paths_reader.iter() {
334            accession_paths.push(AccessionPath::read_capnp(accession_path_reader));
335        }
336
337        // Read annotation groups
338        let annotation_groups_reader = reader.get_annotation_groups().unwrap();
339        let mut annotation_groups = Vec::new();
340        for annotation_group_reader in annotation_groups_reader.iter() {
341            annotation_groups.push(AnnotationGroup::read_capnp(annotation_group_reader));
342        }
343
344        // Read annotations
345        let annotations_reader = reader.get_annotations().unwrap();
346        let mut annotations = Vec::new();
347        for annotation_reader in annotations_reader.iter() {
348            annotations.push(Annotation::read_capnp(annotation_reader));
349        }
350
351        // Read annotation group samples
352        let annotation_group_samples_reader = reader.get_annotation_group_samples().unwrap();
353        let mut annotation_group_samples = Vec::new();
354        for annotation_group_sample_reader in annotation_group_samples_reader.iter() {
355            annotation_group_samples.push(AnnotationGroupSample::read_capnp(
356                annotation_group_sample_reader,
357            ));
358        }
359
360        ChangesetModels {
361            collections,
362            samples,
363            sample_lineages,
364            sequences,
365            block_groups,
366            nodes,
367            edges,
368            block_group_edges,
369            paths,
370            path_edges,
371            accessions,
372            accession_edges,
373            accession_paths,
374            annotation_groups,
375            annotations,
376            annotation_group_samples,
377        }
378    }
379}
380
381// Helper functions for parsing changeset items
382pub fn parse_string(item: &ChangesetItem, col: usize) -> String {
383    str::from_utf8(item.new_value(col).unwrap().as_bytes().unwrap())
384        .unwrap()
385        .to_string()
386}
387
388pub fn parse_maybe_string(item: &ChangesetItem, col: usize) -> Option<String> {
389    item.new_value(col)
390        .unwrap()
391        .as_bytes_or_null()
392        .unwrap()
393        .map(|v| str::from_utf8(v).unwrap().to_string())
394}
395
396pub fn parse_blob(item: &ChangesetItem, col: usize) -> [u8; 32] {
397    let bytes = item.new_value(col).unwrap().as_bytes().unwrap();
398
399    bytes.try_into().expect("blob must be exactly 32 bytes")
400}
401
402pub fn parse_maybe_blob(item: &ChangesetItem, col: usize) -> Option<[u8; 32]> {
403    item.new_value(col)
404        .unwrap()
405        .as_bytes_or_null()
406        .unwrap()
407        .map(|v| v.try_into().expect("blob must be exactly 32 bytes"))
408}
409
410pub fn parse_hashid(item: &ChangesetItem, col: usize) -> HashId {
411    HashId(parse_blob(item, col))
412}
413
414pub fn parse_maybe_hashid(item: &ChangesetItem, col: usize) -> Option<HashId> {
415    parse_maybe_blob(item, col).map(HashId)
416}
417
418pub fn parse_number(item: &ChangesetItem, col: usize) -> i64 {
419    item.new_value(col).unwrap().as_i64().unwrap()
420}
421
422pub fn parse_maybe_number(item: &ChangesetItem, col: usize) -> Option<i64> {
423    item.new_value(col).unwrap().as_i64_or_null().unwrap()
424}
425
426pub fn process_changesetiter(
427    conn: &GraphConnection,
428    mut changes: &[u8],
429) -> (ChangesetModels, DependencyModels) {
430    use fallible_streaming_iterator::FallibleStreamingIterator;
431    use gen_core::is_terminal;
432    use itertools::Itertools;
433
434    use crate::{
435        accession::{Accession, AccessionEdge},
436        block_group::BlockGroup,
437        edge::Edge,
438        node::Node,
439        path::Path,
440        sequence::Sequence,
441        traits::Query,
442    };
443
444    // Initialize collections for changeset models
445    let mut created_block_groups = vec![];
446    let mut created_edges = vec![];
447    let mut created_nodes = vec![];
448    let mut created_sequences = vec![];
449    let mut created_bg_edges = vec![];
450    let mut created_samples = vec![];
451    let mut created_sample_lineages = vec![];
452    let mut created_collections = vec![];
453    let mut created_paths = vec![];
454    let mut created_path_edges = vec![];
455    let mut created_accessions = vec![];
456    let mut created_accession_edges = vec![];
457    let mut created_accession_paths = vec![];
458    let mut created_annotation_groups = vec![];
459    let mut created_annotations = vec![];
460    let mut created_annotation_group_samples = vec![];
461
462    // Initialize collections for dependency tracking
463    let mut previous_collections = HashSet::new();
464    let mut previous_samples = HashSet::new();
465    let mut previous_block_groups = HashSet::new();
466    let mut previous_edges = HashSet::new();
467    let mut previous_paths = HashSet::new();
468    let mut previous_accessions = HashSet::new();
469    let mut previous_nodes = HashSet::new();
470    let mut previous_sequences = HashSet::new();
471    let mut previous_accession_edges = HashSet::new();
472    let mut created_block_groups_set = HashSet::new();
473    let mut created_paths_set = HashSet::new();
474    let mut created_accessions_set = HashSet::new();
475    let mut created_edges_set = HashSet::new();
476    let mut created_accession_edges_set = HashSet::new();
477    let mut created_nodes_set = HashSet::new();
478    let mut created_sequences_set = HashSet::new();
479    let mut created_samples_set: HashSet<String> = HashSet::new();
480    let mut created_collections_set: HashSet<String> = HashSet::new();
481
482    let input: &mut dyn Read = &mut changes;
483    let mut iter = ChangesetIter::start_strm(&input).unwrap();
484
485    while let Some(item) = iter.next().unwrap() {
486        let op = item.op().unwrap();
487        // info on indirect changes: https://www.sqlite.org/draft/session/sqlite3session_indirect.html
488        if !op.indirect() {
489            let table = op.table_name();
490            let pk_column = item
491                .pk()
492                .unwrap()
493                .iter()
494                .find_position(|item| **item == 1)
495                .unwrap()
496                .0;
497            match table {
498                "sequences" => {
499                    let hash = parse_hashid(item, pk_column);
500                    let sequence = Sequence::new()
501                        .sequence_type(&parse_string(item, 1))
502                        .sequence(&parse_string(item, 2))
503                        .name(&parse_string(item, 3))
504                        .file_path(&parse_string(item, 4))
505                        .length(parse_number(item, 5))
506                        .build();
507                    assert_eq!(hash, sequence.hash);
508                    created_sequences.push(sequence);
509                    created_sequences_set.insert(hash);
510                }
511                "block_groups" => {
512                    let bg_pk = HashId(parse_blob(item, pk_column));
513                    let collection = parse_string(item, 1);
514                    let sample_name = parse_string(item, 2);
515                    let name = parse_string(item, 3);
516                    let created_on = parse_number(item, 4);
517                    let parent_block_group_id = parse_maybe_hashid(item, 5);
518
519                    created_block_groups.push(BlockGroup {
520                        id: bg_pk,
521                        collection_name: collection.clone(),
522                        sample_name: sample_name.clone(),
523                        name,
524                        created_on,
525                        parent_block_group_id,
526                        is_default: parse_number(item, 6) != 0,
527                    });
528
529                    created_block_groups_set.insert(bg_pk);
530                    if let Some(parent_block_group_id) = parent_block_group_id
531                        && !created_block_groups_set.contains(&parent_block_group_id)
532                    {
533                        previous_block_groups.insert(parent_block_group_id);
534                    }
535                    if !created_collections_set.contains(&collection) {
536                        previous_collections.insert(collection);
537                    }
538                    if !created_samples_set.contains(&sample_name) {
539                        previous_samples.insert(sample_name);
540                    }
541                }
542                "nodes" => {
543                    let node_id = parse_hashid(item, pk_column);
544                    let sequence_hash = parse_hashid(item, 1);
545
546                    created_nodes.push(Node {
547                        id: node_id,
548                        sequence_hash,
549                    });
550
551                    created_nodes_set.insert(node_id);
552                    if !created_sequences_set.contains(&sequence_hash) {
553                        previous_sequences.insert(sequence_hash);
554                    }
555                }
556                "edges" => {
557                    let edge_id = HashId(parse_blob(item, pk_column));
558                    let source_node_id = parse_hashid(item, 1);
559                    let target_node_id = parse_hashid(item, 4);
560
561                    created_edges.push(Edge {
562                        id: edge_id,
563                        source_node_id,
564                        source_coordinate: parse_number(item, 2),
565                        source_strand: Strand::column_result(item.new_value(3).unwrap()).unwrap(),
566                        target_node_id,
567                        target_coordinate: parse_number(item, 5),
568                        target_strand: Strand::column_result(item.new_value(6).unwrap()).unwrap(),
569                    });
570
571                    created_edges_set.insert(edge_id);
572                    let nodes = Node::query_by_ids(conn, &[source_node_id, target_node_id]);
573                    for node in nodes.iter() {
574                        if !created_nodes_set.contains(&node.id) && !is_terminal(node.id) {
575                            previous_sequences.insert(node.sequence_hash);
576                            previous_nodes.insert(node.id);
577                        }
578                    }
579                }
580                "block_group_edges" => {
581                    let bg_id = HashId(parse_blob(item, 1));
582                    let edge_id = HashId(parse_blob(item, 2));
583
584                    created_bg_edges.push(BlockGroupEdge {
585                        id: HashId(parse_blob(item, pk_column)),
586                        block_group_id: bg_id,
587                        edge_id,
588                        chromosome_index: parse_number(item, 3),
589                        phased: parse_number(item, 4),
590                        created_on: parse_number(item, 5),
591                    });
592
593                    if !created_edges_set.contains(&edge_id) {
594                        previous_edges.insert(edge_id);
595                    }
596                    if !created_block_groups_set.contains(&bg_id) {
597                        previous_block_groups.insert(bg_id);
598                    }
599                }
600                "samples" => {
601                    let name = parse_string(item, pk_column);
602                    created_samples.push(Sample { name: name.clone() });
603                    created_samples_set.insert(name);
604                }
605                "sample_lineage" => {
606                    let parent_sample_name = parse_string(item, 0);
607                    let child_sample_name = parse_string(item, 1);
608
609                    created_sample_lineages.push(SampleLineage {
610                        parent_sample_name: parent_sample_name.clone(),
611                        child_sample_name: child_sample_name.clone(),
612                    });
613
614                    if !created_samples_set.contains(&parent_sample_name) {
615                        previous_samples.insert(parent_sample_name);
616                    }
617                    if !created_samples_set.contains(&child_sample_name) {
618                        previous_samples.insert(child_sample_name);
619                    }
620                }
621                "collections" => {
622                    let name = parse_string(item, pk_column);
623                    created_collections.push(Collection { name: name.clone() });
624                    created_collections_set.insert(name);
625                }
626                "paths" => {
627                    let path_id = HashId(parse_blob(item, pk_column));
628                    let bg_id = HashId(parse_blob(item, 1));
629
630                    created_paths.push(Path {
631                        id: path_id,
632                        block_group_id: bg_id,
633                        name: parse_string(item, 2),
634                        created_on: parse_number(item, 3),
635                    });
636
637                    created_paths_set.insert(path_id);
638                    if !created_block_groups_set.contains(&bg_id) {
639                        previous_block_groups.insert(bg_id);
640                    }
641                }
642                "path_edges" => {
643                    let path_id = HashId(parse_blob(item, 1));
644                    let edge_id = HashId(parse_blob(item, 2));
645
646                    created_path_edges.push(PathEdge {
647                        id: HashId(parse_blob(item, pk_column)),
648                        path_id,
649                        edge_id,
650                        index_in_path: parse_number(item, 3),
651                    });
652
653                    if !created_paths_set.contains(&path_id) {
654                        previous_paths.insert(path_id);
655                    }
656                    if !created_edges_set.contains(&edge_id) {
657                        previous_edges.insert(edge_id);
658                    }
659                }
660                "accessions" => {
661                    let accession_id = HashId(parse_blob(item, pk_column));
662                    let path_id = HashId(parse_blob(item, 2));
663                    let parent_accession_id = parse_maybe_hashid(item, 3);
664
665                    created_accessions.push(Accession {
666                        id: accession_id,
667                        name: parse_string(item, 1),
668                        path_id,
669                        parent_accession_id,
670                    });
671
672                    created_accessions_set.insert(accession_id);
673                    if !created_paths_set.contains(&path_id) {
674                        previous_paths.insert(path_id);
675                    }
676                    if let Some(id) = parent_accession_id
677                        && !created_accessions_set.contains(&id)
678                    {
679                        previous_accessions.insert(id);
680                    }
681                }
682                "accession_edges" => {
683                    let edge_id = parse_hashid(item, pk_column);
684                    let source_node_id = parse_hashid(item, 1);
685                    let target_node_id = parse_hashid(item, 4);
686
687                    created_accession_edges.push(AccessionEdge {
688                        id: edge_id,
689                        source_node_id,
690                        source_coordinate: parse_number(item, 2),
691                        source_strand: Strand::column_result(item.new_value(3).unwrap()).unwrap(),
692                        target_node_id,
693                        target_coordinate: parse_number(item, 5),
694                        target_strand: Strand::column_result(item.new_value(6).unwrap()).unwrap(),
695                        chromosome_index: parse_number(item, 7),
696                    });
697
698                    created_accession_edges_set.insert(edge_id);
699                    let nodes = Node::query_by_ids(conn, &[source_node_id, target_node_id]);
700                    if !created_nodes_set.contains(&source_node_id) {
701                        previous_sequences.insert(nodes[0].sequence_hash);
702                    }
703                    if source_node_id != target_node_id
704                        && !created_nodes_set.contains(&target_node_id)
705                    {
706                        previous_sequences.insert(nodes[1].sequence_hash);
707                    }
708                }
709                "accession_paths" => {
710                    let accession_id = parse_hashid(item, 1);
711                    let edge_id = parse_hashid(item, 3);
712
713                    created_accession_paths.push(AccessionPath {
714                        id: parse_hashid(item, pk_column),
715                        accession_id,
716                        index_in_path: parse_number(item, 2),
717                        edge_id,
718                    });
719
720                    if !created_accessions_set.contains(&accession_id) {
721                        previous_accessions.insert(accession_id);
722                    }
723                    if !created_accession_edges_set.contains(&edge_id) {
724                        previous_accession_edges.insert(edge_id);
725                    }
726                }
727                "annotations" => {
728                    let id = parse_hashid(item, pk_column);
729                    let name = parse_string(item, 1);
730                    let group = parse_string(item, 2);
731                    let accession_id = parse_hashid(item, 3);
732
733                    created_annotations.push(Annotation {
734                        id,
735                        name,
736                        group,
737                        accession_id,
738                    });
739
740                    if !created_accessions_set.contains(&accession_id) {
741                        previous_accessions.insert(accession_id);
742                    }
743                }
744                "annotation_groups" => {
745                    let name = parse_string(item, pk_column);
746
747                    created_annotation_groups.push(AnnotationGroup { name });
748                }
749                "annotation_group_samples" => {
750                    let annotation_group = parse_string(item, 0);
751                    let sample_name = parse_string(item, 1);
752
753                    created_annotation_group_samples.push(AnnotationGroupSample {
754                        annotation_group,
755                        sample_name: sample_name.clone(),
756                    });
757
758                    if !created_samples_set.contains(&sample_name) {
759                        previous_samples.insert(sample_name);
760                    }
761                }
762                t => {
763                    println!("unhandled table {t}")
764                }
765            }
766        }
767    }
768
769    // Process additional dependencies for edges
770    let existing_edges = Edge::query_by_ids(
771        conn,
772        &previous_edges.clone().into_iter().collect::<Vec<_>>(),
773    );
774    let mut new_nodes = vec![];
775    for edge in existing_edges.iter() {
776        if !previous_nodes.contains(&edge.source_node_id) && !is_terminal(edge.source_node_id) {
777            previous_nodes.insert(edge.source_node_id);
778            new_nodes.push(edge.source_node_id);
779        }
780        if !previous_nodes.contains(&edge.target_node_id) && !is_terminal(edge.target_node_id) {
781            previous_nodes.insert(edge.target_node_id);
782            new_nodes.push(edge.target_node_id);
783        }
784    }
785    for node in Node::query_by_ids(conn, &new_nodes) {
786        previous_sequences.insert(node.sequence_hash);
787    }
788
789    let changeset_models = ChangesetModels {
790        sequences: created_sequences,
791        block_groups: created_block_groups,
792        nodes: created_nodes,
793        edges: created_edges,
794        block_group_edges: created_bg_edges,
795        samples: created_samples,
796        sample_lineages: created_sample_lineages,
797        collections: created_collections,
798        paths: created_paths,
799        path_edges: created_path_edges,
800        accessions: created_accessions,
801        accession_edges: created_accession_edges,
802        accession_paths: created_accession_paths,
803        annotation_groups: created_annotation_groups,
804        annotations: created_annotations,
805        annotation_group_samples: created_annotation_group_samples,
806    };
807
808    let dependency_models = DependencyModels {
809        collections: Collection::query_by_ids(conn, &previous_collections),
810        samples: Sample::query_by_ids(conn, &previous_samples),
811        sequences: Sequence::query_by_ids(conn, &previous_sequences),
812        block_group: BlockGroup::query_by_ids(conn, &previous_block_groups),
813        nodes: Node::query_by_ids(conn, &previous_nodes),
814        edges: Edge::query_by_ids(conn, &previous_edges),
815        paths: Path::query_by_ids(conn, &previous_paths),
816        accessions: Accession::query_by_ids(conn, &previous_accessions),
817        accession_edges: AccessionEdge::query_by_ids(conn, &previous_accession_edges),
818    };
819
820    (changeset_models, dependency_models)
821}
822
823pub fn apply_changeset(
824    conn: &GraphConnection,
825    changeset: &ChangesetModels,
826    dependencies: &DependencyModels,
827) -> Result<(), ChangesetError> {
828    for collection in dependencies.collections.iter() {
829        Collection::create(conn, &collection.name);
830    }
831
832    for sample in dependencies.samples.iter() {
833        Sample::get_or_create(conn, &sample.name);
834    }
835
836    for sequence in dependencies.sequences.iter() {
837        NewSequence::from(sequence).save(conn);
838    }
839    for node in dependencies.nodes.iter() {
840        if !is_terminal(node.id) {
841            assert!(Sequence::get_by_id(conn, &node.sequence_hash).is_some());
842        }
843    }
844
845    for bg in block_groups_parent_first(&dependencies.block_group) {
846        BlockGroup::create(
847            conn,
848            NewBlockGroup {
849                collection_name: &bg.collection_name,
850                sample_name: &bg.sample_name,
851                name: &bg.name,
852                parent_block_group_id: bg.parent_block_group_id.as_ref(),
853                is_default: bg.is_default,
854            },
855        );
856    }
857
858    for node in dependencies.nodes.iter() {
859        Node::create(conn, &node.sequence_hash, &node.id);
860    }
861
862    Edge::bulk_create(
863        conn,
864        &dependencies
865            .edges
866            .iter()
867            .map(EdgeData::from)
868            .collect::<Vec<_>>(),
869    );
870
871    for path in dependencies.paths.iter() {
872        Path::create(conn, &path.name, &path.block_group_id, &[]);
873    }
874
875    AccessionEdge::bulk_create(
876        conn,
877        &dependencies
878            .accession_edges
879            .iter()
880            .map(AccessionEdgeData::from)
881            .collect::<Vec<AccessionEdgeData>>(),
882    );
883
884    for accession in dependencies.accessions.iter() {
885        Accession::get_or_create(
886            conn,
887            &accession.name,
888            &accession.path_id,
889            accession.parent_accession_id.as_ref(),
890        );
891    }
892
893    for collection in &changeset.collections {
894        Collection::create(conn, &collection.name);
895    }
896    for sample in &changeset.samples {
897        Sample::get_or_create(conn, &sample.name);
898    }
899    for sample_lineage in &changeset.sample_lineages {
900        SampleLineage::create(
901            conn,
902            &sample_lineage.parent_sample_name,
903            &sample_lineage.child_sample_name,
904        )?;
905    }
906    for sequence in &changeset.sequences {
907        NewSequence::from(sequence).save(conn);
908    }
909    for bg in block_groups_parent_first(&changeset.block_groups) {
910        BlockGroup::create(
911            conn,
912            NewBlockGroup {
913                collection_name: &bg.collection_name,
914                sample_name: &bg.sample_name,
915                name: &bg.name,
916                parent_block_group_id: bg.parent_block_group_id.as_ref(),
917                is_default: bg.is_default,
918            },
919        );
920    }
921    for node in &changeset.nodes {
922        Node::create(conn, &node.sequence_hash, &node.id);
923    }
924
925    Edge::bulk_create(
926        conn,
927        &changeset
928            .edges
929            .iter()
930            .map(EdgeData::from)
931            .collect::<Vec<_>>(),
932    );
933    BlockGroupEdge::bulk_create(
934        conn,
935        &changeset
936            .block_group_edges
937            .iter()
938            .map(BlockGroupEdgeData::from)
939            .collect::<Vec<BlockGroupEdgeData>>(),
940    );
941
942    for path in &changeset.paths {
943        Path::create(conn, &path.name, &path.block_group_id, &[]);
944        let edges = changeset
945            .path_edges
946            .iter()
947            .filter(|edge| edge.path_id == path.id)
948            .sorted_by(|e1, e2| Ord::cmp(&e1.index_in_path, &e2.index_in_path))
949            .map(|path_edge| path_edge.edge_id);
950        PathEdge::bulk_create(conn, &path.id, &edges.collect::<Vec<_>>());
951    }
952
953    AccessionEdge::bulk_create(
954        conn,
955        &changeset
956            .accession_edges
957            .iter()
958            .map(AccessionEdgeData::from)
959            .collect::<Vec<_>>(),
960    );
961
962    for accession in &changeset.accessions {
963        Accession::get_or_create(
964            conn,
965            &accession.name,
966            &accession.path_id,
967            accession.parent_accession_id.as_ref(),
968        );
969        let edges = changeset
970            .accession_paths
971            .iter()
972            .filter(|ap| ap.accession_id == accession.id)
973            .sorted_by(|e1, e2| Ord::cmp(&e1.index_in_path, &e2.index_in_path))
974            .map(|ap| ap.edge_id);
975        AccessionPath::create(conn, &accession.id, &edges.collect::<Vec<_>>());
976    }
977
978    for annotation_group in &changeset.annotation_groups {
979        AnnotationGroup::get_or_create(conn, &annotation_group.name).map_err(|err| match err {
980            AnnotationGroupError::DatabaseError(inner) => inner,
981        })?;
982    }
983
984    for annotation in &changeset.annotations {
985        Annotation::get_or_create(
986            conn,
987            &annotation.name,
988            &annotation.group,
989            &annotation.accession_id,
990        )
991        .map_err(|err| match err {
992            AnnotationError::DatabaseError(inner) => inner,
993            AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
994                inner
995            }
996        })?;
997    }
998
999    for annotation_group_sample in &changeset.annotation_group_samples {
1000        AnnotationGroupSample::create(
1001            conn,
1002            &annotation_group_sample.annotation_group,
1003            &annotation_group_sample.sample_name,
1004        )
1005        .map_err(|err| match err {
1006            AnnotationError::DatabaseError(inner) => inner,
1007            AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
1008                inner
1009            }
1010        })?;
1011    }
1012    Ok(())
1013}
1014
1015pub fn revert_changeset(
1016    conn: &GraphConnection,
1017    changeset: &ChangesetModels,
1018) -> Result<(), ChangesetError> {
1019    for sample_lineage in &changeset.sample_lineages {
1020        SampleLineage::delete(
1021            conn,
1022            &sample_lineage.parent_sample_name,
1023            &sample_lineage.child_sample_name,
1024        )?;
1025    }
1026    for annotation_group_sample in &changeset.annotation_group_samples {
1027        AnnotationGroupSample::delete(
1028            conn,
1029            &annotation_group_sample.annotation_group,
1030            &annotation_group_sample.sample_name,
1031        )
1032        .map_err(|err| match err {
1033            AnnotationError::DatabaseError(inner) => inner,
1034            AnnotationError::AnnotationGroupError(AnnotationGroupError::DatabaseError(inner)) => {
1035                inner
1036            }
1037        })?;
1038    }
1039    Annotation::delete_by_ids(
1040        conn,
1041        &changeset
1042            .annotations
1043            .iter()
1044            .map(|obj| obj.id)
1045            .collect::<Vec<_>>(),
1046    );
1047    AnnotationGroup::delete_by_ids(
1048        conn,
1049        &changeset
1050            .annotation_groups
1051            .iter()
1052            .map(|group| group.name.clone())
1053            .collect::<Vec<_>>(),
1054    );
1055    AccessionPath::delete_by_ids(
1056        conn,
1057        &changeset
1058            .accession_paths
1059            .iter()
1060            .map(|obj| obj.id)
1061            .collect::<Vec<_>>(),
1062    );
1063    // TODO: edges can be made by other operations in other dbs earlier and reused, so we likely want to allow FK failures delete for deletions
1064    AccessionEdge::delete_by_ids(
1065        conn,
1066        &changeset
1067            .accession_edges
1068            .iter()
1069            .map(|pe| pe.id)
1070            .collect::<Vec<_>>(),
1071    );
1072    Accession::delete_by_ids(
1073        conn,
1074        &changeset
1075            .accessions
1076            .iter()
1077            .map(|obj| obj.id)
1078            .collect::<Vec<_>>(),
1079    );
1080    PathEdge::delete_by_ids(
1081        conn,
1082        &changeset
1083            .path_edges
1084            .iter()
1085            .map(|pe| pe.id)
1086            .collect::<Vec<_>>(),
1087    );
1088    Path::delete_by_ids(
1089        conn,
1090        &changeset.paths.iter().map(|p| p.id).collect::<Vec<_>>(),
1091    );
1092
1093    BlockGroupEdge::delete_by_ids(
1094        conn,
1095        &changeset
1096            .block_group_edges
1097            .iter()
1098            .map(|obj| obj.id)
1099            .collect::<Vec<_>>(),
1100    );
1101    Edge::delete_by_ids(
1102        conn,
1103        &changeset.edges.iter().map(|obj| obj.id).collect::<Vec<_>>(),
1104    );
1105
1106    BlockGroup::delete_by_ids(
1107        conn,
1108        &changeset
1109            .block_groups
1110            .iter()
1111            .map(|obj| obj.id)
1112            .collect::<Vec<_>>(),
1113    );
1114
1115    Node::delete_by_ids(
1116        conn,
1117        &changeset.nodes.iter().map(|obj| obj.id).collect::<Vec<_>>(),
1118    );
1119    Collection::delete_by_ids(
1120        conn,
1121        &changeset
1122            .collections
1123            .iter()
1124            .map(|obj| obj.name.clone())
1125            .collect::<Vec<_>>(),
1126    );
1127    Sample::delete_by_ids(
1128        conn,
1129        &changeset
1130            .samples
1131            .iter()
1132            .map(|obj| obj.name.clone())
1133            .collect::<Vec<_>>(),
1134    );
1135    Sequence::delete_by_ids(
1136        conn,
1137        &changeset
1138            .sequences
1139            .iter()
1140            .map(|obj| obj.hash)
1141            .collect::<Vec<_>>(),
1142    );
1143    Ok(())
1144}
1145
1146pub fn get_changeset_from_path(path: PathBuf) -> DatabaseChangeset {
1147    use capnp::serialize_packed;
1148    let file = fs::File::open(path).unwrap();
1149    let mut reader = std::io::BufReader::new(file);
1150
1151    let message_reader =
1152        serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new()).unwrap();
1153    let root = message_reader
1154        .get_root::<database_changeset::Reader>()
1155        .unwrap();
1156    DatabaseChangeset::read_capnp(root)
1157}
1158
1159pub fn get_changeset_dependencies_from_path(path: PathBuf) -> DependencyModels {
1160    use capnp::serialize_packed;
1161
1162    let file = fs::File::open(path).unwrap();
1163    let mut reader = std::io::BufReader::new(file);
1164    let message_reader =
1165        serialize_packed::read_message(&mut reader, capnp::message::ReaderOptions::new()).unwrap();
1166    let root = message_reader
1167        .get_root::<crate::gen_models_capnp::dependency_models::Reader>()
1168        .unwrap();
1169    DependencyModels::read_capnp(root)
1170}
1171
1172// This sorts parents first so when creating blockgroups, we don't have foreign key issues when a child is made before a parent.
1173fn block_groups_parent_first(block_groups: &[BlockGroup]) -> Vec<&BlockGroup> {
1174    let by_id = block_groups
1175        .iter()
1176        .map(|block_group| (block_group.id, block_group))
1177        .collect::<HashMap<_, _>>();
1178
1179    fn walk<'a>(
1180        block_group_id: HashId,
1181        by_id: &HashMap<HashId, &'a BlockGroup>,
1182        visiting: &mut HashSet<HashId>,
1183        visited: &mut HashSet<HashId>,
1184        ordered: &mut Vec<&'a BlockGroup>,
1185    ) {
1186        if visited.contains(&block_group_id) || !visiting.insert(block_group_id) {
1187            return;
1188        }
1189
1190        let block_group = by_id[&block_group_id];
1191        if let Some(parent_id) = block_group.parent_block_group_id
1192            && by_id.contains_key(&parent_id)
1193        {
1194            walk(parent_id, by_id, visiting, visited, ordered);
1195        }
1196
1197        visiting.remove(&block_group_id);
1198        if visited.insert(block_group_id) {
1199            ordered.push(block_group);
1200        }
1201    }
1202
1203    let mut ordered = Vec::new();
1204    let mut visiting = HashSet::new();
1205    let mut visited = HashSet::new();
1206    for block_group in block_groups {
1207        walk(
1208            block_group.id,
1209            &by_id,
1210            &mut visiting,
1211            &mut visited,
1212            &mut ordered,
1213        );
1214    }
1215
1216    ordered
1217}
1218
1219pub fn write_changeset(
1220    workspace: &Workspace,
1221    operation: &Operation,
1222    changes: DatabaseChangeset,
1223    dependencies: &DependencyModels,
1224) {
1225    use capnp::{message::Builder, serialize_packed};
1226
1227    let change_path = operation.get_changeset_path(workspace);
1228    let dependency_path = operation.get_changeset_dependencies_path(workspace);
1229
1230    // Write dependencies using capnp
1231    let mut dependency_file = fs::File::create_new(&dependency_path)
1232        .unwrap_or_else(|_| panic!("Unable to open {dependency_path:?}"));
1233    let mut message = Builder::new_default();
1234    let mut dep_root = message.init_root();
1235    dependencies.write_capnp(&mut dep_root);
1236    serialize_packed::write_message(&mut dependency_file, &message).unwrap();
1237
1238    // Write changes using capnp
1239    let mut file = fs::File::create_new(&change_path)
1240        .unwrap_or_else(|_| panic!("Unable to open {change_path:?}"));
1241    let mut message = Builder::new_default();
1242    let mut change_root = message.init_root();
1243    changes.write_capnp(&mut change_root);
1244    serialize_packed::write_message(&mut file, &message).unwrap();
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249    use chrono::Utc;
1250
1251    use super::*;
1252    use crate::{
1253        file_types::FileTypes,
1254        operations::{OperationFile, OperationInfo},
1255        session_operations::{end_operation, start_operation},
1256        test_helpers::{setup_block_group, setup_gen},
1257        traits::Query,
1258    };
1259
1260    #[test]
1261    fn test_database_changeset_capnp_serialization() {
1262        use capnp::message::TypedBuilder;
1263        use gen_core::Strand;
1264
1265        let changeset_models = ChangesetModels {
1266            collections: vec![crate::collection::Collection {
1267                name: "test_collection".to_string(),
1268            }],
1269            samples: vec![crate::sample::Sample {
1270                name: "test_sample".to_string(),
1271            }],
1272            sample_lineages: vec![SampleLineage {
1273                parent_sample_name: "parent_sample".to_string(),
1274                child_sample_name: "test_sample".to_string(),
1275            }],
1276            sequences: vec![
1277                NewSequence::new()
1278                    .sequence("ATCG")
1279                    .sequence_type("DNA")
1280                    .name("test_seq")
1281                    .build(),
1282            ],
1283            block_groups: vec![BlockGroup {
1284                id: HashId::pad_str(1),
1285                collection_name: "test_collection".to_string(),
1286                sample_name: "test_sample".to_string(),
1287                name: "test_bg".to_string(),
1288                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1289                parent_block_group_id: None,
1290                is_default: false,
1291            }],
1292            nodes: vec![
1293                Node {
1294                    id: HashId::convert_str("node_hash"),
1295                    sequence_hash: HashId::convert_str("test_hash"),
1296                },
1297                Node {
1298                    id: HashId::convert_str("node_hash_2"),
1299                    sequence_hash: HashId::convert_str("test_hash_2"),
1300                },
1301            ],
1302            edges: vec![Edge {
1303                id: HashId::pad_str(1),
1304                source_node_id: HashId::convert_str("1"),
1305                source_coordinate: 0,
1306                source_strand: Strand::Forward,
1307                target_node_id: HashId::convert_str("2"),
1308                target_coordinate: 0,
1309                target_strand: Strand::Forward,
1310            }],
1311            block_group_edges: vec![crate::block_group_edge::BlockGroupEdge {
1312                id: HashId::pad_str(1),
1313                block_group_id: HashId::pad_str(1),
1314                edge_id: HashId::pad_str(1),
1315                chromosome_index: 0,
1316                phased: 0,
1317                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1318            }],
1319            paths: vec![Path {
1320                id: HashId::pad_str(1),
1321                block_group_id: HashId::pad_str(1),
1322                name: "test_path".to_string(),
1323                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1324            }],
1325            path_edges: vec![PathEdge {
1326                id: HashId::pad_str(1),
1327                path_id: HashId::pad_str(1),
1328                index_in_path: 0,
1329                edge_id: HashId::pad_str(1),
1330            }],
1331            accessions: vec![Accession {
1332                id: HashId::pad_str(1),
1333                name: "test_accession".to_string(),
1334                path_id: HashId::pad_str(1),
1335                parent_accession_id: None,
1336            }],
1337            accession_edges: vec![AccessionEdge {
1338                id: HashId::pad_str(1),
1339                source_node_id: HashId::convert_str("1"),
1340                source_coordinate: 0,
1341                source_strand: Strand::Forward,
1342                target_node_id: HashId::convert_str("2"),
1343                target_coordinate: 0,
1344                target_strand: Strand::Forward,
1345                chromosome_index: 0,
1346            }],
1347            accession_paths: vec![AccessionPath {
1348                id: HashId::pad_str(1),
1349                accession_id: HashId::pad_str(1),
1350                index_in_path: 0,
1351                edge_id: HashId::pad_str(1),
1352            }],
1353            annotation_groups: vec![AnnotationGroup {
1354                name: "gff3".to_string(),
1355            }],
1356            annotations: vec![Annotation {
1357                id: HashId::pad_str(1),
1358                name: "test_annotation".to_string(),
1359                group: "gff3".to_string(),
1360                accession_id: HashId::pad_str(1),
1361            }],
1362            annotation_group_samples: vec![AnnotationGroupSample {
1363                annotation_group: "gff3".to_string(),
1364                sample_name: "test_sample".to_string(),
1365            }],
1366        };
1367
1368        let changeset = DatabaseChangeset {
1369            db_path: "/path/to/db".to_string(),
1370            changes: changeset_models,
1371        };
1372
1373        let mut message = TypedBuilder::<database_changeset::Owned>::new_default();
1374        let mut root = message.init_root();
1375        changeset.write_capnp(&mut root);
1376
1377        let deserialized = DatabaseChangeset::read_capnp(root.into_reader());
1378        assert_eq!(changeset, deserialized);
1379    }
1380
1381    #[test]
1382    fn test_database_changeset_get_db_path() {
1383        use std::io::Write;
1384
1385        use capnp::{message::Builder, serialize_packed};
1386        use tempfile::NamedTempFile;
1387
1388        let expected_db_path = "/tmp/test_db_path";
1389        let changeset = DatabaseChangeset {
1390            db_path: expected_db_path.to_string(),
1391            changes: ChangesetModels {
1392                collections: vec![],
1393                samples: vec![],
1394                sample_lineages: vec![],
1395                sequences: vec![],
1396                block_groups: vec![],
1397                nodes: vec![],
1398                edges: vec![],
1399                block_group_edges: vec![],
1400                paths: vec![],
1401                path_edges: vec![],
1402                accessions: vec![],
1403                accession_edges: vec![],
1404                accession_paths: vec![],
1405                annotation_groups: vec![],
1406                annotations: vec![],
1407                annotation_group_samples: vec![],
1408            },
1409        };
1410
1411        let mut temp_file = NamedTempFile::new().unwrap();
1412        let path = temp_file.path().to_path_buf();
1413
1414        let mut message = Builder::new_default();
1415        let mut root = message.init_root();
1416        changeset.write_capnp(&mut root);
1417        serialize_packed::write_message(&mut temp_file, &message).unwrap();
1418        temp_file.flush().unwrap();
1419
1420        let db_path = DatabaseChangeset::get_db_path(path.as_path());
1421
1422        assert_eq!(db_path, expected_db_path);
1423    }
1424
1425    #[test]
1426    fn test_changeset_models_capnp_serialization() {
1427        use capnp::message::TypedBuilder;
1428        use gen_core::Strand;
1429
1430        let changeset_models = ChangesetModels {
1431            collections: vec![crate::collection::Collection {
1432                name: "test_collection".to_string(),
1433            }],
1434            samples: vec![crate::sample::Sample {
1435                name: "test_sample".to_string(),
1436            }],
1437            sample_lineages: vec![SampleLineage {
1438                parent_sample_name: "parent_sample".to_string(),
1439                child_sample_name: "test_sample".to_string(),
1440            }],
1441            sequences: vec![
1442                NewSequence::new()
1443                    .sequence("ATCG")
1444                    .sequence_type("DNA")
1445                    .name("test_seq")
1446                    .build(),
1447            ],
1448            block_groups: vec![BlockGroup {
1449                id: HashId::pad_str(1),
1450                collection_name: "test_collection".to_string(),
1451                sample_name: "test_sample".to_string(),
1452                name: "test_bg".to_string(),
1453                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1454                parent_block_group_id: None,
1455                is_default: false,
1456            }],
1457            nodes: vec![
1458                Node {
1459                    id: HashId::convert_str("node_hash"),
1460                    sequence_hash: HashId::convert_str("test_hash"),
1461                },
1462                Node {
1463                    id: HashId::convert_str("node_hash_2"),
1464                    sequence_hash: HashId::convert_str("test_hash_2"),
1465                },
1466            ],
1467            edges: vec![Edge {
1468                id: HashId::pad_str(1),
1469                source_node_id: HashId::convert_str("1"),
1470                source_coordinate: 0,
1471                source_strand: Strand::Forward,
1472                target_node_id: HashId::convert_str("2"),
1473                target_coordinate: 0,
1474                target_strand: Strand::Forward,
1475            }],
1476            block_group_edges: vec![crate::block_group_edge::BlockGroupEdge {
1477                id: HashId::pad_str(1),
1478                block_group_id: HashId::pad_str(1),
1479                edge_id: HashId::pad_str(1),
1480                chromosome_index: 0,
1481                phased: 0,
1482                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1483            }],
1484            paths: vec![Path {
1485                id: HashId::pad_str(1),
1486                block_group_id: HashId::pad_str(1),
1487                name: "test_path".to_string(),
1488                created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1489            }],
1490            path_edges: vec![PathEdge {
1491                id: HashId::pad_str(1),
1492                path_id: HashId::pad_str(1),
1493                index_in_path: 0,
1494                edge_id: HashId::pad_str(1),
1495            }],
1496            accessions: vec![Accession {
1497                id: HashId::pad_str(1),
1498                name: "test_accession".to_string(),
1499                path_id: HashId::pad_str(1),
1500                parent_accession_id: None,
1501            }],
1502            accession_edges: vec![AccessionEdge {
1503                id: HashId::pad_str(1),
1504                source_node_id: HashId::convert_str("1"),
1505                source_coordinate: 0,
1506                source_strand: Strand::Forward,
1507                target_node_id: HashId::convert_str("2"),
1508                target_coordinate: 0,
1509                target_strand: Strand::Forward,
1510                chromosome_index: 0,
1511            }],
1512            accession_paths: vec![AccessionPath {
1513                id: HashId::pad_str(1),
1514                accession_id: HashId::pad_str(1),
1515                index_in_path: 0,
1516                edge_id: HashId::pad_str(1),
1517            }],
1518            annotation_groups: vec![AnnotationGroup {
1519                name: "gff3".to_string(),
1520            }],
1521            annotations: vec![Annotation {
1522                id: HashId::pad_str(1),
1523                name: "test_annotation".to_string(),
1524                group: "gff3".to_string(),
1525                accession_id: HashId::pad_str(1),
1526            }],
1527            annotation_group_samples: vec![AnnotationGroupSample {
1528                annotation_group: "gff3".to_string(),
1529                sample_name: "test_sample".to_string(),
1530            }],
1531        };
1532
1533        let mut message = TypedBuilder::<changeset_models::Owned>::new_default();
1534        let mut root = message.init_root();
1535        changeset_models.write_capnp(&mut root);
1536
1537        let deserialized = ChangesetModels::read_capnp(root.into_reader());
1538
1539        assert_eq!(changeset_models, deserialized);
1540    }
1541
1542    #[test]
1543    fn test_changeset_includes_annotations() {
1544        use crate::block_group::PathCache;
1545
1546        let context = setup_gen();
1547        let conn = context.graph().conn();
1548        let op_conn = context.operations().conn();
1549
1550        let db_uuid = crate::metadata::get_db_uuid(conn);
1551        crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path").unwrap();
1552
1553        let _ = Sample::create(conn, "sample-1").unwrap();
1554        let (block_group_id, path) = setup_block_group(conn);
1555        let mut cache = PathCache::new(conn);
1556        let _ = PathCache::lookup(&mut cache, &block_group_id, path.name.clone());
1557        let accession = BlockGroup::add_accession(conn, &path, "ann-accession", 0, 5, &mut cache);
1558
1559        let mut session = start_operation(conn);
1560        let annotation = Annotation::get_or_create(conn, "gene-a", "gff3", &accession.id).unwrap();
1561        annotation.add_samples(conn, &["sample-1"]).unwrap();
1562
1563        let operation = end_operation(
1564            &context,
1565            &mut session,
1566            &OperationInfo {
1567                files: vec![],
1568                description: "annotation op".to_string(),
1569            },
1570            "annotation op",
1571            None,
1572        )
1573        .unwrap();
1574
1575        let changeset = operation.get_changeset(context.workspace());
1576        assert_eq!(
1577            changeset.changes.annotation_groups,
1578            vec![AnnotationGroup {
1579                name: "gff3".to_string(),
1580            }]
1581        );
1582        assert_eq!(changeset.changes.annotations, vec![annotation.clone()]);
1583        assert_eq!(
1584            changeset.changes.annotation_group_samples,
1585            vec![AnnotationGroupSample {
1586                annotation_group: annotation.group.clone(),
1587                sample_name: "sample-1".to_string(),
1588            }]
1589        );
1590
1591        let dependencies = operation.get_changeset_dependencies(context.workspace());
1592        assert_eq!(dependencies.samples.len(), 1);
1593        assert_eq!(dependencies.samples[0].name, "sample-1");
1594        assert_eq!(dependencies.accessions.len(), 1);
1595        assert_eq!(dependencies.accessions[0].id, accession.id);
1596    }
1597
1598    #[test]
1599    fn test_changeset_includes_sample_lineage() {
1600        let context = setup_gen();
1601        let conn = context.graph().conn();
1602        let op_conn = context.operations().conn();
1603
1604        let db_uuid = crate::metadata::get_db_uuid(conn);
1605        crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path").unwrap();
1606
1607        let _ = Sample::create(conn, "parent").unwrap();
1608
1609        let mut session = start_operation(conn);
1610        let _ = Sample::create(conn, "child").unwrap();
1611        SampleLineage::create(conn, "parent", "child").unwrap();
1612
1613        let operation = end_operation(
1614            &context,
1615            &mut session,
1616            &OperationInfo {
1617                files: vec![],
1618                description: "sample lineage op".to_string(),
1619            },
1620            "sample lineage op",
1621            None,
1622        )
1623        .unwrap();
1624
1625        let changeset = operation.get_changeset(context.workspace());
1626        assert_eq!(
1627            changeset.changes.sample_lineages,
1628            vec![SampleLineage {
1629                parent_sample_name: "parent".to_string(),
1630                child_sample_name: "child".to_string(),
1631            }]
1632        );
1633
1634        let dependencies = operation.get_changeset_dependencies(context.workspace());
1635        assert_eq!(dependencies.samples.len(), 1);
1636        assert_eq!(dependencies.samples[0].name, "parent");
1637    }
1638
1639    #[test]
1640    fn test_block_groups_parent_first() {
1641        let parent = BlockGroup {
1642            id: HashId::pad_str(1),
1643            collection_name: "test".to_string(),
1644            sample_name: "parent".to_string(),
1645            name: "bg".to_string(),
1646            created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1647            parent_block_group_id: None,
1648            is_default: false,
1649        };
1650        let child = BlockGroup {
1651            id: HashId::pad_str(2),
1652            collection_name: "test".to_string(),
1653            sample_name: "child".to_string(),
1654            name: "bg".to_string(),
1655            created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1656            parent_block_group_id: Some(parent.id),
1657            is_default: false,
1658        };
1659
1660        let block_groups = [child.clone(), parent.clone()];
1661        let ordered = block_groups_parent_first(&block_groups);
1662        assert_eq!(ordered, vec![&parent, &child]);
1663    }
1664
1665    #[test]
1666    fn test_block_groups_parent_first_with_three_level_lineage() {
1667        let parent = BlockGroup {
1668            id: HashId::pad_str(1),
1669            collection_name: "test".to_string(),
1670            sample_name: "parent".to_string(),
1671            name: "bg".to_string(),
1672            created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1673            parent_block_group_id: None,
1674            is_default: false,
1675        };
1676        let child = BlockGroup {
1677            id: HashId::pad_str(2),
1678            collection_name: "test".to_string(),
1679            sample_name: "child".to_string(),
1680            name: "bg".to_string(),
1681            created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1682            parent_block_group_id: Some(parent.id),
1683            is_default: false,
1684        };
1685        let grandchild = BlockGroup {
1686            id: HashId::pad_str(3),
1687            collection_name: "test".to_string(),
1688            sample_name: "grandchild".to_string(),
1689            name: "bg".to_string(),
1690            created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1691            parent_block_group_id: Some(child.id),
1692            is_default: false,
1693        };
1694
1695        let block_groups = [grandchild.clone(), child.clone(), parent.clone()];
1696        let ordered = block_groups_parent_first(&block_groups);
1697        assert_eq!(ordered, vec![&parent, &child, &grandchild]);
1698    }
1699
1700    #[test]
1701    fn test_block_groups_parent_first_when_every_entry_has_a_parent() {
1702        let root_id = HashId::pad_str(1);
1703        let child = BlockGroup {
1704            id: HashId::pad_str(2),
1705            collection_name: "test".to_string(),
1706            sample_name: "child".to_string(),
1707            name: "bg".to_string(),
1708            created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1709            parent_block_group_id: Some(root_id),
1710            is_default: false,
1711        };
1712        let grandchild = BlockGroup {
1713            id: HashId::pad_str(3),
1714            collection_name: "test".to_string(),
1715            sample_name: "grandchild".to_string(),
1716            name: "bg".to_string(),
1717            created_on: Utc::now().timestamp_nanos_opt().unwrap(),
1718            parent_block_group_id: Some(child.id),
1719            is_default: false,
1720        };
1721
1722        let block_groups = [grandchild.clone(), child.clone()];
1723        let ordered = block_groups_parent_first(&block_groups);
1724        assert_eq!(ordered, vec![&child, &grandchild]);
1725    }
1726
1727    #[cfg(test)]
1728    mod changeset_dependencies {
1729        use super::*;
1730
1731        #[test]
1732        fn test_tracks_nodes_and_sequences_from_previous_block_group_edges() {
1733            let context = setup_gen();
1734            let conn = context.graph().conn();
1735            let op_conn = context.operations().conn();
1736
1737            let db_uuid = crate::metadata::get_db_uuid(conn);
1738            crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path")
1739                .unwrap();
1740
1741            let (bg_id, _path_id) = setup_block_group(conn);
1742            let old_edges = BlockGroupEdge::edges_for_block_group(conn, &bg_id);
1743
1744            let mut session = start_operation(conn);
1745            // make a blockgroup with an edge from our parent blockgroup
1746            let _ = Sample::create(conn, "new").unwrap();
1747            let new_bg = BlockGroup::create(
1748                conn,
1749                NewBlockGroup {
1750                    collection_name: "test",
1751                    sample_name: "new",
1752                    name: "new-bg",
1753                    ..Default::default()
1754                },
1755            );
1756            let shared_edge = old_edges[0].edge.clone();
1757            BlockGroupEdge::bulk_create(
1758                conn,
1759                &[BlockGroupEdgeData {
1760                    block_group_id: new_bg.id,
1761                    edge_id: shared_edge.id,
1762                    chromosome_index: 0,
1763                    phased: 0,
1764                }],
1765            );
1766            let operation = end_operation(
1767                &context,
1768                &mut session,
1769                &OperationInfo {
1770                    files: vec![],
1771                    description: "test".to_string(),
1772                },
1773                "test",
1774                None,
1775            )
1776            .unwrap();
1777
1778            let dependencies = operation.get_changeset_dependencies(context.workspace());
1779            assert_eq!(dependencies.nodes[0].id, shared_edge.target_node_id);
1780            assert_eq!(dependencies.nodes.len(), 1);
1781            let nodes = Node::query_by_ids(conn, &[shared_edge.target_node_id]);
1782            assert_eq!(dependencies.sequences[0].hash, nodes[0].sequence_hash);
1783            assert_eq!(dependencies.sequences.len(), 1);
1784        }
1785
1786        #[test]
1787        fn test_records_patch_dependencies() {
1788            let context = setup_gen();
1789            let conn = context.graph().conn();
1790            let op_conn = context.operations().conn();
1791
1792            let db_uuid = crate::metadata::get_db_uuid(conn);
1793            crate::files::GenDatabase::create(op_conn, &db_uuid, "test_db", "test_db_path")
1794                .unwrap();
1795
1796            // create some stuff before we attach to our main session that will be required as extra information
1797            let (bg_id, _path_id) = setup_block_group(conn);
1798            let dep_bg = BlockGroup::get_by_id(conn, &bg_id);
1799
1800            let existing_seq = Sequence::new()
1801                .sequence_type("DNA")
1802                .sequence("AAAATTTT")
1803                .save(conn);
1804            let existing_node_id =
1805                Node::create(conn, &existing_seq.hash, &HashId::convert_str("1"));
1806
1807            let mut session = start_operation(conn);
1808
1809            let random_seq = Sequence::new()
1810                .sequence_type("DNA")
1811                .sequence("ATCG")
1812                .save(conn);
1813            let random_node_id = Node::create(conn, &random_seq.hash, &HashId::convert_str("2"));
1814
1815            let new_edge = Edge::create(
1816                conn,
1817                random_node_id,
1818                0,
1819                Strand::Forward,
1820                existing_node_id,
1821                0,
1822                Strand::Forward,
1823            );
1824            let block_group_edge = BlockGroupEdgeData {
1825                block_group_id: bg_id,
1826                edge_id: new_edge.id,
1827                chromosome_index: 0,
1828                phased: 0,
1829            };
1830            BlockGroupEdge::bulk_create(conn, &[block_group_edge]);
1831            let operation = end_operation(
1832                &context,
1833                &mut session,
1834                &OperationInfo {
1835                    files: vec![OperationFile {
1836                        file_path: "test".to_string(),
1837                        file_type: FileTypes::None,
1838                    }],
1839                    description: "test".to_string(),
1840                },
1841                "test",
1842                None,
1843            )
1844            .unwrap();
1845
1846            let dependencies = operation.get_changeset_dependencies(context.workspace());
1847            assert_eq!(dependencies.sequences.len(), 1);
1848            assert_eq!(
1849                dependencies.block_group[0].collection_name,
1850                dep_bg.collection_name
1851            );
1852            assert_eq!(dependencies.block_group[0].name, dep_bg.name);
1853            assert_eq!(dependencies.block_group[0].sample_name, dep_bg.sample_name);
1854        }
1855    }
1856}