sources_core/spec.rs
1//! [`SourceSpec`] — the source's own view of what to build.
2//!
3//! A source needs only a fraction of the top-level `Config`: the indexes it
4//! must keep in sync and the shape of each (root table, field sources, filters).
5//! It needs neither the sink list nor any connection or OpenSearch detail. That
6//! genuine subset is [`SourceSpec`], expressed entirely in [`schema_core`] types
7//! the source already speaks.
8//!
9//! The composition root translates the top-level config into a `SourceSpec` and
10//! hands the backend the spec, so the backend knows nothing about how the
11//! application is configured. `SourceSpec` is expressed purely in `schema-core`
12//! vocabulary and never references the assembled `Config` (which lives a layer
13//! up), so a source is reusable and unit-testable without constructing one, and
14//! the `flusso.toml` shape can evolve without recompiling the backend.
15
16use std::cmp::Ordering;
17use std::collections::{BTreeMap, BTreeSet};
18use std::fmt;
19
20use schema_core::{
21 DatabaseSchema, Field, IndexMapping, IndexName, IndexSchema, RelationKey, TableName,
22};
23
24/// A schema-qualified table, the unit a source needs to reason about coverage
25/// (which tables it must be able to stream). Ordered by `(schema, table)` so a
26/// [`BTreeSet`] of them is deterministic.
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct QualifiedTable {
29 pub schema: DatabaseSchema,
30 pub table: TableName,
31}
32
33impl QualifiedTable {
34 pub fn new(schema: DatabaseSchema, table: TableName) -> Self {
35 Self { schema, table }
36 }
37}
38
39impl fmt::Display for QualifiedTable {
40 /// Renders `schema.table` — the form Postgres accepts in `FOR TABLE` lists.
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(f, "{}.{}", self.schema.as_ref(), self.table.as_ref())
43 }
44}
45
46impl PartialOrd for QualifiedTable {
47 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
48 Some(self.cmp(other))
49 }
50}
51
52impl Ord for QualifiedTable {
53 fn cmp(&self, other: &Self) -> Ordering {
54 (self.schema.as_ref(), self.table.as_ref())
55 .cmp(&(other.schema.as_ref(), other.table.as_ref()))
56 }
57}
58
59/// The enabled indexes a source must build, each paired with its schema.
60///
61/// Everything here is treated as live — disabled indexes are dropped by the
62/// composition root when it translates the config into this spec, so the source
63/// never has to re-check an `enabled` flag.
64#[derive(Debug, Clone, Default)]
65pub struct SourceSpec {
66 indexes: BTreeMap<IndexName, IndexSchema>,
67}
68
69impl SourceSpec {
70 /// Build a spec from an explicit set of `(index, schema)` pairs. Every entry
71 /// is treated as live; the caller (the composition root) is responsible for
72 /// having filtered out disabled indexes when translating from a config.
73 pub fn new(indexes: BTreeMap<IndexName, IndexSchema>) -> Self {
74 Self { indexes }
75 }
76
77 /// Iterate the indexes and their schemas, in index-name order.
78 pub fn indexes(&self) -> impl Iterator<Item = (&IndexName, &IndexSchema)> {
79 self.indexes.iter()
80 }
81
82 pub fn schema(&self, index: &IndexName) -> Option<&IndexSchema> {
83 self.indexes.get(index)
84 }
85
86 /// Project every index into its fully-typed [`IndexMapping`], using only the
87 /// declared schema — the database-free counterpart the engine creates
88 /// indexes from up front.
89 pub fn index_mappings(&self) -> Vec<IndexMapping> {
90 self.indexes
91 .iter()
92 .map(|(name, schema)| schema.resolve(name.clone()))
93 .collect()
94 }
95
96 /// Every table any enabled index reads — each index's root table plus every
97 /// table a join or aggregate (and any `through` junction) pulls from.
98 ///
99 /// Relations carry no schema of their own; the resolver qualifies a related
100 /// table with the *index's* `db_schema` (see the Postgres `resolve` module),
101 /// so this does the same. This is the set a source must be able to stream —
102 /// what [`CaptureProvisioning`](crate::CaptureProvisioning) checks coverage
103 /// against.
104 pub fn all_tables(&self) -> BTreeSet<QualifiedTable> {
105 let mut tables = BTreeSet::new();
106 for schema in self.indexes.values() {
107 tables.insert(QualifiedTable::new(
108 schema.db_schema.clone(),
109 schema.table.clone(),
110 ));
111 collect_relation_tables(&schema.fields, &schema.db_schema, &mut tables);
112 }
113 tables
114 }
115}
116
117/// Walk the field tree, adding every relation target table (and any `through`
118/// junction table) under `db_schema` to `out`.
119fn collect_relation_tables(
120 fields: &[Field],
121 db_schema: &DatabaseSchema,
122 out: &mut BTreeSet<QualifiedTable>,
123) {
124 for field in fields {
125 if let Some(relation) = field.relation() {
126 out.insert(QualifiedTable::new(
127 db_schema.clone(),
128 relation.table().clone(),
129 ));
130 if let RelationKey::Through(through) = relation.key() {
131 out.insert(QualifiedTable::new(
132 db_schema.clone(),
133 through.table.clone(),
134 ));
135 }
136 }
137 collect_relation_tables(field.children(), db_schema, out);
138 }
139}
140
141#[cfg(test)]
142#[allow(clippy::unwrap_used)]
143mod tests;