Skip to main content

drasi_lib/config/
schema.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Result;
16use bytes::Bytes;
17use serde::{Deserialize, Serialize};
18use std::collections::HashSet;
19
20use crate::channels::DispatchMode;
21use crate::indexes::{StorageBackendConfig, StorageBackendRef};
22use crate::recovery::RecoveryPolicy;
23use drasi_core::models::SourceMiddlewareConfig;
24
25// Re-export schema discovery types from their dedicated module for backward compatibility.
26// Consumers can import from either `crate::config` or `crate::schema`.
27pub use crate::schema::{
28    normalize_table_label, GraphNodeSchema, GraphRelationSchema, GraphSchema, NodeSchema,
29    PropertySchema, PropertyType, RelationSchema, SourceSchema,
30};
31
32/// Query language for continuous queries
33///
34/// Drasi supports two query languages for continuous query processing:
35///
36/// # Query Languages
37///
38/// - **Cypher**: Default graph query language with pattern matching
39/// - **GQL**: GraphQL-style queries compiled to Cypher
40///
41/// # Default Behavior
42///
43/// If not specified, queries default to `QueryLanguage::Cypher`.
44///
45/// # Examples
46///
47/// ## Using Cypher (Default)
48///
49/// ```yaml
50/// queries:
51///   - id: active_orders
52///     query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
53///     queryLanguage: Cypher  # Optional, this is the default
54///     sources: [orders_db]
55/// ```
56///
57/// ## Using GQL
58///
59/// ```yaml
60/// queries:
61///   - id: user_data
62///     query: |
63///       {
64///         users(status: "active") {
65///           id
66///           name
67///           email
68///         }
69///       }
70///     queryLanguage: GQL
71///     sources: [users_db]
72/// ```
73///
74/// # Important Limitations
75///
76/// **Unsupported Clauses**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
77/// queries as they conflict with incremental result computation.
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
79pub enum QueryLanguage {
80    #[default]
81    Cypher,
82    GQL,
83}
84
85/// Source subscription configuration for queries
86///
87/// `SourceSubscriptionConfig` defines how a query subscribes to a specific source,
88/// including any middleware pipeline to apply to changes from that source.
89///
90/// # Fields
91///
92/// - **source_id**: ID of the source to subscribe to
93/// - **nodes**: Optional list of node labels to subscribe to from this source
94/// - **relations**: Optional list of relation labels to subscribe to from this source
95/// - **pipeline**: Optional list of middleware IDs to apply to changes from this source
96///
97/// # Examples
98///
99/// ## Simple Subscription (No Pipeline)
100///
101/// ```yaml
102/// source_subscriptions:
103///   - source_id: orders_db
104///     pipeline: []
105/// ```
106///
107/// ## Subscription with Middleware Pipeline
108///
109/// ```yaml
110/// source_subscriptions:
111///   - source_id: raw_events
112///     pipeline: [decoder, mapper, validator]
113/// ```
114///
115/// ## Subscription with Label Filtering
116///
117/// ```yaml
118/// source_subscriptions:
119///   - source_id: orders_db
120///     nodes: [Order, Customer]
121///     relations: [PLACED_BY]
122///     pipeline: []
123/// ```
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SourceSubscriptionConfig {
126    pub source_id: String,
127    #[serde(default)]
128    pub nodes: Vec<String>,
129    #[serde(default)]
130    pub relations: Vec<String>,
131    #[serde(default)]
132    pub pipeline: Vec<String>,
133}
134
135/// Settings passed to a source when subscribing
136///
137/// `SourceSubscriptionSettings` contains all the information a source needs to
138/// intelligently handle bootstrap and subscription for a query, including the
139/// specific node and relation labels the query is interested in.
140///
141/// # Fields
142///
143/// - **source_id**: ID of the source
144/// - **enable_bootstrap**: Whether to request initial data
145/// - **query_id**: ID of the subscribing query
146/// - **nodes**: Set of node labels the query is interested in from this source
147/// - **relations**: Set of relation labels the query is interested in from this source
148///
149/// # Example
150///
151/// ```ignore
152/// use drasi_lib::config::SourceSubscriptionSettings;
153/// use std::collections::HashSet;
154///
155/// let settings = SourceSubscriptionSettings {
156///     source_id: "orders_db".to_string(),
157///     enable_bootstrap: true,
158///     query_id: "my-query".to_string(),
159///     nodes: ["Order", "Customer"].iter().map(|s| s.to_string()).collect(),
160///     relations: ["PLACED_BY"].iter().map(|s| s.to_string()).collect(),
161///     resume_from: None,
162///     request_position_handle: false,
163///     last_sequence: None,
164/// };
165/// ```
166#[derive(Debug, Clone)]
167pub struct SourceSubscriptionSettings {
168    pub source_id: String,
169    pub enable_bootstrap: bool,
170    pub query_id: String,
171    pub nodes: HashSet<String>,
172    pub relations: HashSet<String>,
173    /// If set, the subscribing query requests events replayed from this source position.
174    /// Contains the opaque position bytes that the source interprets to seek its change stream.
175    /// Only meaningful when the source returns `supports_replay() == true`.
176    pub resume_from: Option<Bytes>,
177    /// If true, the query requests a shared `Arc<AtomicU64>` position handle in the
178    /// `SubscriptionResponse` for reporting its durably-processed position back to the source.
179    pub request_position_handle: bool,
180    /// The last sequence number processed by this query before shutdown.
181    /// Sources use this to ensure their sequence counter continues from where it left off,
182    /// maintaining monotonicity across restarts.
183    pub last_sequence: Option<u64>,
184}
185
186/// Root configuration for drasi-lib
187///
188/// `DrasiLibConfig` is the top-level configuration structure for DrasiLib.
189/// It defines server settings, queries, and storage backends.
190///
191/// # Plugin Architecture
192///
193/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
194/// reactions are passed as owned instances via `with_source()` and `with_reaction()`
195/// on the builder. Only queries can be configured via the builder.
196///
197/// # Configuration Structure
198///
199/// A typical configuration has these sections:
200///
201/// 1. **id**: Unique server identifier (optional, defaults to UUID)
202/// 2. **priority_queue_capacity**: Default capacity for event queues (optional)
203/// 3. **dispatch_buffer_capacity**: Default capacity for dispatch buffers (optional)
204/// 4. **storage_backends**: Storage backend definitions (optional)
205/// 5. **queries**: Continuous queries to process data
206///
207/// # Capacity Settings
208///
209/// - **priority_queue_capacity**: Default capacity for timestamp-ordered event queues in
210///   queries and reactions. Higher values support more out-of-order events but consume
211///   more memory. Default: 10000
212///
213/// - **dispatch_buffer_capacity**: Default capacity for event dispatch channels between
214///   components (sources → queries, queries → reactions). Higher values improve throughput
215///   under load but consume more memory. Default: 1000
216///
217/// Individual components can override these defaults by setting their own capacity values.
218///
219/// # Thread Safety
220///
221/// This struct is `Clone` and can be safely shared across threads.
222///
223/// # Usage
224///
225/// Use `DrasiLib::builder()` to create instances:
226///
227/// ```no_run
228/// use drasi_lib::{DrasiLib, Query};
229///
230/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
231/// let core = DrasiLib::builder()
232///     .with_id("my-server")
233///     .with_query(
234///         Query::cypher("my-query")
235///             .query("MATCH (n) RETURN n")
236///             .from_source("my-source")
237///             .build()
238///     )
239///     .build()
240///     .await?;
241/// # Ok(())
242/// # }
243/// ```
244///
245/// # Validation
246///
247/// Call [`validate()`](DrasiLibConfig::validate) to check:
248/// - Unique query IDs
249/// - Valid storage backend references
250///
251/// Note: Source and reaction validation happens at runtime when instances are added.
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct DrasiLibConfig {
254    /// Unique identifier for this DrasiLib instance (defaults to UUID)
255    #[serde(default = "default_id")]
256    pub id: String,
257    /// Default priority queue capacity for queries and reactions (default: 10000 if not specified)
258    #[serde(default, skip_serializing_if = "Option::is_none")]
259    pub priority_queue_capacity: Option<usize>,
260    /// Default dispatch buffer capacity for sources and queries (default: 1000 if not specified)
261    #[serde(default, skip_serializing_if = "Option::is_none")]
262    pub dispatch_buffer_capacity: Option<usize>,
263    /// Global storage backend definitions that can be referenced by queries
264    #[serde(default)]
265    pub storage_backends: Vec<StorageBackendConfig>,
266    /// Query configurations
267    #[serde(default)]
268    pub queries: Vec<QueryConfig>,
269}
270
271impl Default for DrasiLibConfig {
272    fn default() -> Self {
273        Self {
274            id: default_id(),
275            priority_queue_capacity: None,
276            dispatch_buffer_capacity: None,
277            storage_backends: Vec::new(),
278            queries: Vec::new(),
279        }
280    }
281}
282
283/// Configuration for a continuous query
284///
285/// `QueryConfig` defines a continuous query that processes data changes from sources
286/// and emits incremental result updates. Queries subscribe to one or more sources and
287/// maintain materialized views that update automatically as data changes.
288///
289/// # Query Languages
290///
291/// Queries can be written in either:
292/// - **Cypher**: Default graph pattern matching language
293/// - **GQL**: GraphQL-style queries (compiled to Cypher)
294///
295/// **Important**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
296/// queries as they conflict with incremental result computation.
297///
298/// # Bootstrap Processing
299///
300/// - **enableBootstrap**: Controls whether the query processes initial data (default: true)
301/// - **bootstrapBufferSize**: Event buffer size during bootstrap phase (default: 10000)
302///
303/// During bootstrap, events are buffered to maintain ordering while initial data loads.
304/// After bootstrap completes, queries switch to incremental processing mode.
305///
306/// # Synthetic Joins
307///
308/// Queries can define synthetic relationships between node types from different sources
309/// via the `joins` field. This creates virtual edges based on property equality without
310/// requiring physical relationships in the source data.
311///
312/// # Configuration Fields
313///
314/// - **id**: Unique identifier (referenced by reactions)
315/// - **query**: Query string in specified language
316/// - **queryLanguage**: Cypher or GQL (default: Cypher)
317/// - **sources**: Source IDs to subscribe to
318/// - **auto_start**: Start automatically (default: true)
319/// - **joins**: Optional synthetic join definitions
320/// - **enableBootstrap**: Process initial data (default: true)
321/// - **bootstrapBufferSize**: Buffer size during bootstrap (default: 10000)
322/// - **priority_queue_capacity**: Out-of-order event queue size (overrides global)
323/// - **dispatch_buffer_capacity**: Output buffer size (overrides global)
324/// - **dispatch_mode**: Broadcast or Channel routing
325///
326/// # Examples
327///
328/// ## Basic Cypher Query
329///
330/// ```yaml
331/// queries:
332///   - id: active_orders
333///     query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
334///     queryLanguage: Cypher  # Optional, this is default
335///     sources: [orders_db]
336///     auto_start: true
337///     enableBootstrap: true
338///     bootstrapBufferSize: 10000
339/// ```
340///
341/// ## Query with Multiple Sources
342///
343/// ```yaml
344/// queries:
345///   - id: order_customer_join
346///     query: |
347///       MATCH (o:Order)-[:BELONGS_TO]->(c:Customer)
348///       WHERE o.status = 'active'
349///       RETURN o, c
350///     sources: [orders_db, customers_db]
351/// ```
352///
353/// ## Query with Synthetic Joins
354///
355/// ```yaml
356/// queries:
357///   - id: synthetic_join_query
358///     query: |
359///       MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
360///       RETURN o.id, c.name
361///     sources: [orders_db, customers_db]
362///     joins:
363///       - id: CUSTOMER              # Relationship type in query
364///         keys:
365///           - label: Order
366///             property: customer_id
367///           - label: Customer
368///             property: id
369/// ```
370///
371/// ## High-Throughput Query
372///
373/// ```yaml
374/// queries:
375///   - id: high_volume_processing
376///     query: "MATCH (n:Event) WHERE n.timestamp > timestamp() - 60000 RETURN n"
377///     sources: [event_stream]
378///     priority_queue_capacity: 100000  # Large queue for many out-of-order events
379///     dispatch_buffer_capacity: 10000  # Large output buffer
380///     bootstrapBufferSize: 50000       # Large bootstrap buffer
381/// ```
382///
383/// ## GQL Query
384///
385/// ```yaml
386/// queries:
387///   - id: gql_users
388///     query: |
389///       {
390///         users(status: "active") {
391///           id
392///           name
393///           email
394///         }
395///       }
396///     queryLanguage: GQL
397///     sources: [users_db]
398/// ```
399#[derive(Debug, Clone, Serialize, Deserialize)]
400pub struct QueryConfig {
401    /// Unique identifier for the query
402    pub id: String,
403    /// Query string (Cypher or GQL depending on query_language)
404    pub query: String,
405    /// Query language to use (default: Cypher)
406    #[serde(default, rename = "queryLanguage")]
407    pub query_language: QueryLanguage,
408    /// Middleware configurations for this query
409    #[serde(default)]
410    pub middleware: Vec<SourceMiddlewareConfig>,
411    /// Source subscriptions with optional middleware pipelines
412    #[serde(default)]
413    pub sources: Vec<SourceSubscriptionConfig>,
414    /// Whether to automatically start this query (default: true)
415    #[serde(default = "default_auto_start")]
416    pub auto_start: bool,
417    /// Optional synthetic joins for the query
418    #[serde(skip_serializing_if = "Option::is_none")]
419    pub joins: Option<Vec<QueryJoinConfig>>,
420    /// Whether to enable bootstrap (default: true)
421    #[serde(default = "default_enable_bootstrap", rename = "enableBootstrap")]
422    pub enable_bootstrap: bool,
423    /// Maximum number of events to buffer during bootstrap (default: 10000)
424    #[serde(
425        default = "default_bootstrap_buffer_size",
426        rename = "bootstrapBufferSize"
427    )]
428    pub bootstrap_buffer_size: usize,
429    /// Priority queue capacity for this query (default: server global, or 10000 if not specified)
430    #[serde(default, skip_serializing_if = "Option::is_none")]
431    pub priority_queue_capacity: Option<usize>,
432    /// Dispatch buffer capacity for this query (default: server global, or 1000 if not specified)
433    #[serde(default, skip_serializing_if = "Option::is_none")]
434    pub dispatch_buffer_capacity: Option<usize>,
435    /// Dispatch mode for this query (default: Channel)
436    #[serde(default, skip_serializing_if = "Option::is_none")]
437    pub dispatch_mode: Option<DispatchMode>,
438    /// Storage backend for this query (default: in-memory)
439    /// Can reference a named backend or provide inline configuration
440    #[serde(skip_serializing_if = "Option::is_none")]
441    pub storage_backend: Option<StorageBackendRef>,
442    /// Recovery policy when a source cannot honor a requested resume position.
443    /// `None` inherits the global default (itself defaulting to `Strict`).
444    /// See [`RecoveryPolicy`](crate::RecoveryPolicy).
445    #[serde(
446        default,
447        skip_serializing_if = "Option::is_none",
448        rename = "recoveryPolicy"
449    )]
450    pub recovery_policy: Option<RecoveryPolicy>,
451    /// Maximum number of recent QueryResult emissions retained in the outbox ring buffer.
452    /// Reactions use this buffer for gap recovery via `fetch_outbox`.
453    /// Default: 1000. Minimum: 1 (values below 1 are clamped to 1).
454    #[serde(default = "default_outbox_capacity", rename = "outboxCapacity")]
455    pub outbox_capacity: usize,
456    /// Maximum time in seconds that `fetch_snapshot` / `fetch_outbox` will wait for
457    /// the query to finish bootstrapping before returning `FetchError::TimedOut`.
458    /// Default: 300 (5 minutes).
459    #[serde(
460        default = "default_bootstrap_timeout_secs",
461        rename = "bootstrapTimeoutSecs"
462    )]
463    pub bootstrap_timeout_secs: u64,
464}
465
466/// Synthetic join configuration for queries
467///
468/// `QueryJoinConfig` defines a virtual relationship between node types from different
469/// sources. This allows queries to join data without requiring physical relationships
470/// in the source systems.
471///
472/// # Join Semantics
473///
474/// Joins create synthetic edges by matching property values across nodes. The `id`
475/// field specifies the relationship type used in the query's MATCH pattern, and `keys`
476/// define which properties to match.
477///
478/// # Examples
479///
480/// ## Simple Join on Single Property
481///
482/// ```yaml
483/// joins:
484///   - id: CUSTOMER              # Use in query: MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
485///     keys:
486///       - label: Order
487///         property: customer_id  # Order.customer_id
488///       - label: Customer
489///         property: id           # Customer.id
490///       # Creates edge when Order.customer_id == Customer.id
491/// ```
492///
493/// ## Multi-Source Join
494///
495/// ```yaml
496/// joins:
497///   - id: ASSIGNED_TO
498///     keys:
499///       - label: Task
500///         property: assignee_id
501///       - label: User
502///         property: user_id
503/// ```
504#[derive(Debug, Clone, Serialize, Deserialize)]
505pub struct QueryJoinConfig {
506    /// Unique identifier for the join (should match relationship type in query)
507    pub id: String,
508    /// Keys defining the join relationship
509    pub keys: Vec<QueryJoinKeyConfig>,
510}
511
512/// Join key specification for synthetic joins
513///
514/// `QueryJoinKeyConfig` specifies one side of a join condition by identifying
515/// a node label and the property to use for matching.
516///
517/// # Example
518///
519/// For joining orders to customers:
520///
521/// ```yaml
522/// keys:
523///   - label: Order
524///     property: customer_id
525///   - label: Customer
526///     property: id
527/// ```
528///
529/// This creates an edge when `Order.customer_id == Customer.id`.
530#[derive(Debug, Clone, Serialize, Deserialize)]
531pub struct QueryJoinKeyConfig {
532    /// Node label to match
533    pub label: String,
534    /// Property to use for joining
535    pub property: String,
536}
537
538impl DrasiLibConfig {
539    /// Validate configuration consistency and references
540    ///
541    /// Performs comprehensive validation checks:
542    /// - Ensures all query IDs are unique
543    /// - Validates storage backend configurations
544    ///
545    /// Note: Source and reaction validation happens at runtime when instances are added,
546    /// since drasi-lib has no knowledge of specific plugin configurations.
547    ///
548    /// # Errors
549    ///
550    /// Returns error if validation fails with a description of the problem.
551    pub fn validate(&self) -> Result<()> {
552        // Validate unique query ids
553        let mut query_ids = std::collections::HashSet::new();
554        for query in &self.queries {
555            if !query_ids.insert(&query.id) {
556                return Err(anyhow::anyhow!("Duplicate query id: '{}'", query.id));
557            }
558        }
559
560        // Validate unique storage backend ids
561        let mut storage_backend_ids = std::collections::HashSet::new();
562        for backend in &self.storage_backends {
563            if !storage_backend_ids.insert(&backend.id) {
564                return Err(anyhow::anyhow!(
565                    "Duplicate storage backend id: '{}'",
566                    backend.id
567                ));
568            }
569            // Validate backend configuration
570            backend.spec.validate().map_err(|e| {
571                anyhow::anyhow!(
572                    "Storage backend '{}' has invalid configuration: {}",
573                    backend.id,
574                    e
575                )
576            })?;
577        }
578
579        // Validate storage backend references in queries
580        for query in &self.queries {
581            if let Some(backend_ref) = &query.storage_backend {
582                match backend_ref {
583                    StorageBackendRef::Named(backend_id) => {
584                        if !storage_backend_ids.contains(backend_id) {
585                            return Err(anyhow::anyhow!(
586                                "Query '{}' references unknown storage backend: '{}'",
587                                query.id,
588                                backend_id
589                            ));
590                        }
591                    }
592                    StorageBackendRef::Inline(spec) => {
593                        // Validate inline backend configuration
594                        spec.validate().map_err(|e| {
595                            anyhow::anyhow!(
596                                "Query '{}' has invalid inline storage backend configuration: {}",
597                                query.id,
598                                e
599                            )
600                        })?;
601                    }
602                }
603            }
604        }
605
606        Ok(())
607    }
608}
609
610fn default_id() -> String {
611    uuid::Uuid::new_v4().to_string()
612}
613
614fn default_auto_start() -> bool {
615    true
616}
617
618fn default_enable_bootstrap() -> bool {
619    true
620}
621
622fn default_bootstrap_buffer_size() -> usize {
623    10000
624}
625
626fn default_outbox_capacity() -> usize {
627    crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY
628}
629
630fn default_bootstrap_timeout_secs() -> u64 {
631    300
632}
633
634// Conversion implementations for QueryJoin types
635impl From<QueryJoinKeyConfig> for drasi_core::models::QueryJoinKey {
636    fn from(config: QueryJoinKeyConfig) -> Self {
637        drasi_core::models::QueryJoinKey {
638            label: config.label,
639            property: config.property,
640        }
641    }
642}
643
644impl From<QueryJoinConfig> for drasi_core::models::QueryJoin {
645    fn from(config: QueryJoinConfig) -> Self {
646        drasi_core::models::QueryJoin {
647            id: config.id,
648            keys: config.keys.into_iter().map(|k| k.into()).collect(),
649        }
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656    use std::collections::BTreeSet;
657
658    #[test]
659    fn btreeset_sources_maintains_sorted_order_and_deduplication() {
660        let mut graph = GraphSchema::default();
661        let schema = SourceSchema {
662            nodes: vec![NodeSchema {
663                label: "Item".to_string(),
664                properties: Vec::new(),
665            }],
666            relations: Vec::new(),
667        };
668
669        graph.merge_source_schema("charlie", &schema);
670        graph.merge_source_schema("alpha", &schema);
671        graph.merge_source_schema("bravo", &schema);
672        graph.merge_source_schema("alpha", &schema); // duplicate
673
674        let item = graph.nodes.get("Item").unwrap();
675        let sources: Vec<_> = item.sources.iter().cloned().collect();
676        assert_eq!(sources, vec!["alpha", "bravo", "charlie"]);
677    }
678
679    #[test]
680    fn normalize_table_label_strips_schema_prefix() {
681        assert_eq!(normalize_table_label("public.users"), "users");
682        assert_eq!(normalize_table_label("dbo.orders"), "orders");
683        assert_eq!(normalize_table_label("customers"), "customers");
684    }
685
686    #[test]
687    fn merge_source_schema_combines_properties() {
688        let mut graph = GraphSchema::default();
689
690        let schema_a = SourceSchema {
691            nodes: vec![NodeSchema {
692                label: "Sensor".to_string(),
693                properties: vec![PropertySchema::new("temperature")],
694            }],
695            relations: Vec::new(),
696        };
697        let schema_b = SourceSchema {
698            nodes: vec![NodeSchema {
699                label: "Sensor".to_string(),
700                properties: vec![
701                    PropertySchema {
702                        name: "temperature".to_string(),
703                        data_type: Some(PropertyType::Float),
704                        description: None,
705                    },
706                    PropertySchema::new("humidity"),
707                ],
708            }],
709            relations: Vec::new(),
710        };
711
712        graph.merge_source_schema("src-a", &schema_a);
713        graph.merge_source_schema("src-b", &schema_b);
714
715        let sensor = graph.nodes.get("Sensor").unwrap();
716        assert_eq!(
717            sensor.sources,
718            BTreeSet::from(["src-a".to_string(), "src-b".to_string()])
719        );
720        assert_eq!(sensor.properties.len(), 2);
721
722        // temperature should have gotten the type from src-b
723        let temp = sensor
724            .properties
725            .iter()
726            .find(|p| p.name == "temperature")
727            .unwrap();
728        assert_eq!(temp.data_type, Some(PropertyType::Float));
729    }
730}