Skip to main content

sources_core/
spec.rs

1//! [`SourceSpec`] — the source's own view of what to build.
2//!
3//! A source needs only a fraction of the top-level `Config`: the indexes it
4//! must keep in sync and the shape of each (root table, field sources, filters).
5//! It needs neither the sink list nor any connection or OpenSearch detail. That
6//! genuine subset is [`SourceSpec`], expressed entirely in [`schema_core`] types
7//! the source already speaks.
8//!
9//! The composition root translates the top-level config into a `SourceSpec` and
10//! hands the backend the spec, so the backend knows nothing about how the
11//! application is configured. `SourceSpec` is expressed purely in `schema-core`
12//! vocabulary and never references the assembled `Config` (which lives a layer
13//! up), so a source is reusable and unit-testable without constructing one, and
14//! the `flusso.toml` shape can evolve without recompiling the backend.
15
16use std::cmp::Ordering;
17use std::collections::{BTreeMap, BTreeSet};
18use std::fmt;
19
20use schema_core::{
21    DatabaseSchema, Field, IndexMapping, IndexName, IndexSchema, RelationKey, TableName,
22};
23
24/// A schema-qualified table, the unit a source needs to reason about coverage
25/// (which tables it must be able to stream). Ordered by `(schema, table)` so a
26/// [`BTreeSet`] of them is deterministic.
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct QualifiedTable {
29    pub schema: DatabaseSchema,
30    pub table: TableName,
31}
32
33impl QualifiedTable {
34    pub fn new(schema: DatabaseSchema, table: TableName) -> Self {
35        Self { schema, table }
36    }
37}
38
39impl fmt::Display for QualifiedTable {
40    /// Renders `schema.table` — the form Postgres accepts in `FOR TABLE` lists.
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        write!(f, "{}.{}", self.schema.as_ref(), self.table.as_ref())
43    }
44}
45
46impl PartialOrd for QualifiedTable {
47    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
48        Some(self.cmp(other))
49    }
50}
51
52impl Ord for QualifiedTable {
53    fn cmp(&self, other: &Self) -> Ordering {
54        (self.schema.as_ref(), self.table.as_ref())
55            .cmp(&(other.schema.as_ref(), other.table.as_ref()))
56    }
57}
58
59/// The enabled indexes a source must build, each paired with its schema.
60///
61/// Everything here is treated as live — disabled indexes are dropped by the
62/// composition root when it translates the config into this spec, so the source
63/// never has to re-check an `enabled` flag.
64#[derive(Debug, Clone, Default)]
65pub struct SourceSpec {
66    indexes: BTreeMap<IndexName, IndexSchema>,
67}
68
69impl SourceSpec {
70    /// Build a spec from an explicit set of `(index, schema)` pairs. Every entry
71    /// is treated as live; the caller (the composition root) is responsible for
72    /// having filtered out disabled indexes when translating from a config.
73    pub fn new(indexes: BTreeMap<IndexName, IndexSchema>) -> Self {
74        Self { indexes }
75    }
76
77    /// Iterate the indexes and their schemas, in index-name order.
78    pub fn indexes(&self) -> impl Iterator<Item = (&IndexName, &IndexSchema)> {
79        self.indexes.iter()
80    }
81
82    pub fn schema(&self, index: &IndexName) -> Option<&IndexSchema> {
83        self.indexes.get(index)
84    }
85
86    /// Project every index into its fully-typed [`IndexMapping`], using only the
87    /// declared schema — the database-free counterpart the engine creates
88    /// indexes from up front.
89    pub fn index_mappings(&self) -> Vec<IndexMapping> {
90        self.indexes
91            .iter()
92            .map(|(name, schema)| schema.resolve(name.clone()))
93            .collect()
94    }
95
96    /// Every table any enabled index reads — each index's root table plus every
97    /// table a join or aggregate (and any `through` junction) pulls from.
98    ///
99    /// Relations carry no schema of their own; the resolver qualifies a related
100    /// table with the *index's* `db_schema` (see the Postgres `resolve` module),
101    /// so this does the same. This is the set a source must be able to stream —
102    /// what [`CaptureProvisioning`](crate::CaptureProvisioning) checks coverage
103    /// against.
104    pub fn all_tables(&self) -> BTreeSet<QualifiedTable> {
105        let mut tables = BTreeSet::new();
106        for schema in self.indexes.values() {
107            tables.insert(QualifiedTable::new(
108                schema.db_schema.clone(),
109                schema.table.clone(),
110            ));
111            collect_relation_tables(&schema.fields, &schema.db_schema, &mut tables);
112        }
113        tables
114    }
115}
116
117/// Walk the field tree, adding every relation target table (and any `through`
118/// junction table) under `db_schema` to `out`.
119fn collect_relation_tables(
120    fields: &[Field],
121    db_schema: &DatabaseSchema,
122    out: &mut BTreeSet<QualifiedTable>,
123) {
124    for field in fields {
125        if let Some(relation) = field.relation() {
126            out.insert(QualifiedTable::new(
127                db_schema.clone(),
128                relation.table().clone(),
129            ));
130            if let RelationKey::Through(through) = relation.key() {
131                out.insert(QualifiedTable::new(
132                    db_schema.clone(),
133                    through.table.clone(),
134                ));
135            }
136        }
137        collect_relation_tables(field.children(), db_schema, out);
138    }
139}
140
141#[cfg(test)]
142#[allow(clippy::unwrap_used)]
143mod tests {
144    use std::collections::BTreeMap;
145
146    use schema_core::{
147        Column, DatabaseSchema, Field, FieldSource, FlussoType, IndexName, IndexSchema, Join,
148        JoinKind, Relation, TableName, Through,
149    };
150
151    use super::{QualifiedTable, SourceSpec};
152
153    fn index_name(name: &str) -> IndexName {
154        IndexName::try_new(name).unwrap()
155    }
156
157    fn column_field(name: &str) -> Field {
158        Field {
159            field: schema_core::FieldName::try_new(name).unwrap(),
160            options: Default::default(),
161            source: FieldSource::Column(Column {
162                column: schema_core::ColumnName::try_new(name).unwrap(),
163                ty: FlussoType::Keyword,
164                nullable: false,
165                transforms: Vec::new(),
166                default: None,
167            }),
168        }
169    }
170
171    /// A one-column schema over `public.<table>`, enough to resolve a mapping.
172    fn schema(table: &str) -> IndexSchema {
173        IndexSchema {
174            version: 1,
175            table: schema_core::TableName::try_new(table).unwrap(),
176            db_schema: DatabaseSchema::try_new("public").unwrap(),
177            primary_key: Some(schema_core::ColumnName::try_new("id").unwrap()),
178            doc_id: None,
179            soft_delete: None,
180            filters: None,
181            fields: vec![Field {
182                field: schema_core::FieldName::try_new("id").unwrap(),
183                options: Default::default(),
184                source: FieldSource::Column(Column {
185                    column: schema_core::ColumnName::try_new("id").unwrap(),
186                    ty: FlussoType::Keyword,
187                    nullable: false,
188                    transforms: Vec::new(),
189                    default: None,
190                }),
191            }],
192        }
193    }
194
195    #[test]
196    fn accessors_expose_indexes_in_name_order() {
197        let mut indexes = BTreeMap::new();
198        indexes.insert(index_name("b"), schema("bees"));
199        indexes.insert(index_name("a"), schema("ants"));
200        let spec = SourceSpec::new(indexes);
201
202        let names: Vec<&str> = spec.indexes().map(|(name, _)| name.as_ref()).collect();
203        assert_eq!(names, ["a", "b"]);
204        assert!(spec.schema(&index_name("a")).is_some());
205        assert!(spec.schema(&index_name("missing")).is_none());
206
207        let mappings = spec.index_mappings();
208        assert_eq!(mappings.len(), 2);
209        assert_eq!(mappings.first().unwrap().index.as_ref(), "a");
210    }
211
212    #[test]
213    fn all_tables_collects_roots_relations_and_junctions() {
214        // `books` over public.books with a has_many join to `reviews` and a
215        // many_to_many to `tags` through the `book_tags` junction.
216        let mut books = schema("books");
217        books.fields.push(Field {
218            field: schema_core::FieldName::try_new("reviews").unwrap(),
219            options: Default::default(),
220            source: FieldSource::Relation(Relation::Join(Join {
221                table: TableName::try_new("reviews").unwrap(),
222                kind: JoinKind::HasMany {
223                    foreign_key: schema_core::ColumnName::try_new("book_id").unwrap(),
224                },
225                primary_key: schema_core::ColumnName::try_new("id").unwrap(),
226                filters: None,
227                order_by: None,
228                limit: None,
229                fields: vec![column_field("body")],
230            })),
231        });
232        books.fields.push(Field {
233            field: schema_core::FieldName::try_new("tags").unwrap(),
234            options: Default::default(),
235            source: FieldSource::Relation(Relation::Join(Join {
236                table: TableName::try_new("tags").unwrap(),
237                kind: JoinKind::ManyToMany {
238                    through: Through {
239                        table: TableName::try_new("book_tags").unwrap(),
240                        left_key: schema_core::ColumnName::try_new("book_id").unwrap(),
241                        right_key: schema_core::ColumnName::try_new("tag_id").unwrap(),
242                    },
243                },
244                primary_key: schema_core::ColumnName::try_new("id").unwrap(),
245                filters: None,
246                order_by: None,
247                limit: None,
248                fields: vec![column_field("name")],
249            })),
250        });
251
252        let mut indexes = BTreeMap::new();
253        indexes.insert(index_name("books"), books);
254        // A second index sharing no tables, to prove the set spans all indexes.
255        indexes.insert(index_name("ants"), schema("ants"));
256        let spec = SourceSpec::new(indexes);
257
258        let public = DatabaseSchema::try_new("public").unwrap();
259        let qt = |t: &str| QualifiedTable::new(public.clone(), TableName::try_new(t).unwrap());
260        let tables = spec.all_tables();
261
262        assert_eq!(
263            tables,
264            [
265                qt("ants"),
266                qt("book_tags"),
267                qt("books"),
268                qt("reviews"),
269                qt("tags"),
270            ]
271            .into_iter()
272            .collect()
273        );
274    }
275}