drasi_lib/config/schema.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Result;
16use bytes::Bytes;
17use serde::{Deserialize, Serialize};
18use std::collections::HashSet;
19
20use crate::channels::DispatchMode;
21use crate::indexes::{StorageBackendConfig, StorageBackendRef};
22use crate::recovery::RecoveryPolicy;
23use drasi_core::models::SourceMiddlewareConfig;
24
25// Re-export schema discovery types from their dedicated module for backward compatibility.
26// Consumers can import from either `crate::config` or `crate::schema`.
27pub use crate::schema::{
28 normalize_table_label, GraphNodeSchema, GraphRelationSchema, GraphSchema, NodeSchema,
29 PropertySchema, PropertyType, RelationSchema, SourceSchema,
30};
31
32/// Query language for continuous queries
33///
34/// Drasi supports two query languages for continuous query processing:
35///
36/// # Query Languages
37///
38/// - **Cypher**: Default graph query language with pattern matching
39/// - **GQL**: GraphQL-style queries compiled to Cypher
40///
41/// # Default Behavior
42///
43/// If not specified, queries default to `QueryLanguage::Cypher`.
44///
45/// # Examples
46///
47/// ## Using Cypher (Default)
48///
49/// ```yaml
50/// queries:
51/// - id: active_orders
52/// query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
53/// queryLanguage: Cypher # Optional, this is the default
54/// sources: [orders_db]
55/// ```
56///
57/// ## Using GQL
58///
59/// ```yaml
60/// queries:
61/// - id: user_data
62/// query: |
63/// {
64/// users(status: "active") {
65/// id
66/// name
67/// email
68/// }
69/// }
70/// queryLanguage: GQL
71/// sources: [users_db]
72/// ```
73///
74/// # Important Limitations
75///
76/// **Unsupported Clauses**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
77/// queries as they conflict with incremental result computation.
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
79pub enum QueryLanguage {
80 #[default]
81 Cypher,
82 GQL,
83}
84
85/// Source subscription configuration for queries
86///
87/// `SourceSubscriptionConfig` defines how a query subscribes to a specific source,
88/// including any middleware pipeline to apply to changes from that source.
89///
90/// # Fields
91///
92/// - **source_id**: ID of the source to subscribe to
93/// - **nodes**: Optional list of node labels to subscribe to from this source
94/// - **relations**: Optional list of relation labels to subscribe to from this source
95/// - **pipeline**: Optional list of middleware IDs to apply to changes from this source
96///
97/// # Examples
98///
99/// ## Simple Subscription (No Pipeline)
100///
101/// ```yaml
102/// source_subscriptions:
103/// - source_id: orders_db
104/// pipeline: []
105/// ```
106///
107/// ## Subscription with Middleware Pipeline
108///
109/// ```yaml
110/// source_subscriptions:
111/// - source_id: raw_events
112/// pipeline: [decoder, mapper, validator]
113/// ```
114///
115/// ## Subscription with Label Filtering
116///
117/// ```yaml
118/// source_subscriptions:
119/// - source_id: orders_db
120/// nodes: [Order, Customer]
121/// relations: [PLACED_BY]
122/// pipeline: []
123/// ```
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SourceSubscriptionConfig {
126 pub source_id: String,
127 #[serde(default)]
128 pub nodes: Vec<String>,
129 #[serde(default)]
130 pub relations: Vec<String>,
131 #[serde(default)]
132 pub pipeline: Vec<String>,
133}
134
135/// Settings passed to a source when subscribing
136///
137/// `SourceSubscriptionSettings` contains all the information a source needs to
138/// intelligently handle bootstrap and subscription for a query, including the
139/// specific node and relation labels the query is interested in.
140///
141/// # Fields
142///
143/// - **source_id**: ID of the source
144/// - **enable_bootstrap**: Whether to request initial data
145/// - **query_id**: ID of the subscribing query
146/// - **nodes**: Set of node labels the query is interested in from this source
147/// - **relations**: Set of relation labels the query is interested in from this source
148///
149/// # Example
150///
151/// ```ignore
152/// use drasi_lib::config::SourceSubscriptionSettings;
153/// use std::collections::HashSet;
154///
155/// let settings = SourceSubscriptionSettings {
156/// source_id: "orders_db".to_string(),
157/// enable_bootstrap: true,
158/// query_id: "my-query".to_string(),
159/// nodes: ["Order", "Customer"].iter().map(|s| s.to_string()).collect(),
160/// relations: ["PLACED_BY"].iter().map(|s| s.to_string()).collect(),
161/// resume_from: None,
162/// request_position_handle: false,
163/// last_sequence: None,
164/// };
165/// ```
166#[derive(Debug, Clone)]
167pub struct SourceSubscriptionSettings {
168 pub source_id: String,
169 pub enable_bootstrap: bool,
170 pub query_id: String,
171 pub nodes: HashSet<String>,
172 pub relations: HashSet<String>,
173 /// If set, the subscribing query requests events replayed from this source position.
174 /// Contains the opaque position bytes that the source interprets to seek its change stream.
175 /// Only meaningful when the source returns `supports_replay() == true`.
176 pub resume_from: Option<Bytes>,
177 /// If true, the query requests a shared `Arc<AtomicU64>` position handle in the
178 /// `SubscriptionResponse` for reporting its durably-processed position back to the source.
179 pub request_position_handle: bool,
180 /// The last sequence number processed by this query before shutdown.
181 /// Sources use this to ensure their sequence counter continues from where it left off,
182 /// maintaining monotonicity across restarts.
183 pub last_sequence: Option<u64>,
184}
185
186/// Root configuration for drasi-lib
187///
188/// `DrasiLibConfig` is the top-level configuration structure for DrasiLib.
189/// It defines server settings, queries, and storage backends.
190///
191/// # Plugin Architecture
192///
193/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
194/// reactions are passed as owned instances via `with_source()` and `with_reaction()`
195/// on the builder. Only queries can be configured via the builder.
196///
197/// # Configuration Structure
198///
199/// A typical configuration has these sections:
200///
201/// 1. **id**: Unique server identifier (optional, defaults to UUID)
202/// 2. **priority_queue_capacity**: Default capacity for event queues (optional)
203/// 3. **dispatch_buffer_capacity**: Default capacity for dispatch buffers (optional)
204/// 4. **storage_backends**: Storage backend definitions (optional)
205/// 5. **queries**: Continuous queries to process data
206///
207/// # Capacity Settings
208///
209/// - **priority_queue_capacity**: Default capacity for timestamp-ordered event queues in
210/// queries and reactions. Higher values support more out-of-order events but consume
211/// more memory. Default: 10000
212///
213/// - **dispatch_buffer_capacity**: Default capacity for event dispatch channels between
214/// components (sources → queries, queries → reactions). Higher values improve throughput
215/// under load but consume more memory. Default: 1000
216///
217/// Individual components can override these defaults by setting their own capacity values.
218///
219/// # Thread Safety
220///
221/// This struct is `Clone` and can be safely shared across threads.
222///
223/// # Usage
224///
225/// Use `DrasiLib::builder()` to create instances:
226///
227/// ```no_run
228/// use drasi_lib::{DrasiLib, Query};
229///
230/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
231/// let core = DrasiLib::builder()
232/// .with_id("my-server")
233/// .with_query(
234/// Query::cypher("my-query")
235/// .query("MATCH (n) RETURN n")
236/// .from_source("my-source")
237/// .build()
238/// )
239/// .build()
240/// .await?;
241/// # Ok(())
242/// # }
243/// ```
244///
245/// # Validation
246///
247/// Call [`validate()`](DrasiLibConfig::validate) to check:
248/// - Unique query IDs
249/// - Valid storage backend references
250///
251/// Note: Source and reaction validation happens at runtime when instances are added.
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct DrasiLibConfig {
254 /// Unique identifier for this DrasiLib instance (defaults to UUID)
255 #[serde(default = "default_id")]
256 pub id: String,
257 /// Default priority queue capacity for queries and reactions (default: 10000 if not specified)
258 #[serde(default, skip_serializing_if = "Option::is_none")]
259 pub priority_queue_capacity: Option<usize>,
260 /// Default dispatch buffer capacity for sources and queries (default: 1000 if not specified)
261 #[serde(default, skip_serializing_if = "Option::is_none")]
262 pub dispatch_buffer_capacity: Option<usize>,
263 /// Global storage backend definitions that can be referenced by queries
264 #[serde(default)]
265 pub storage_backends: Vec<StorageBackendConfig>,
266 /// Query configurations
267 #[serde(default)]
268 pub queries: Vec<QueryConfig>,
269}
270
271impl Default for DrasiLibConfig {
272 fn default() -> Self {
273 Self {
274 id: default_id(),
275 priority_queue_capacity: None,
276 dispatch_buffer_capacity: None,
277 storage_backends: Vec::new(),
278 queries: Vec::new(),
279 }
280 }
281}
282
283/// Configuration for a continuous query
284///
285/// `QueryConfig` defines a continuous query that processes data changes from sources
286/// and emits incremental result updates. Queries subscribe to one or more sources and
287/// maintain materialized views that update automatically as data changes.
288///
289/// # Query Languages
290///
291/// Queries can be written in either:
292/// - **Cypher**: Default graph pattern matching language
293/// - **GQL**: GraphQL-style queries (compiled to Cypher)
294///
295/// **Important**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
296/// queries as they conflict with incremental result computation.
297///
298/// # Bootstrap Processing
299///
300/// - **enableBootstrap**: Controls whether the query processes initial data (default: true)
301/// - **bootstrapBufferSize**: Event buffer size during bootstrap phase (default: 10000)
302///
303/// During bootstrap, events are buffered to maintain ordering while initial data loads.
304/// After bootstrap completes, queries switch to incremental processing mode.
305///
306/// # Synthetic Joins
307///
308/// Queries can define synthetic relationships between node types from different sources
309/// via the `joins` field. This creates virtual edges based on property equality without
310/// requiring physical relationships in the source data.
311///
312/// # Configuration Fields
313///
314/// - **id**: Unique identifier (referenced by reactions)
315/// - **query**: Query string in specified language
316/// - **queryLanguage**: Cypher or GQL (default: Cypher)
317/// - **sources**: Source IDs to subscribe to
318/// - **auto_start**: Start automatically (default: true)
319/// - **joins**: Optional synthetic join definitions
320/// - **enableBootstrap**: Process initial data (default: true)
321/// - **bootstrapBufferSize**: Buffer size during bootstrap (default: 10000)
322/// - **priority_queue_capacity**: Out-of-order event queue size (overrides global)
323/// - **dispatch_buffer_capacity**: Output buffer size (overrides global)
324/// - **dispatch_mode**: Broadcast or Channel routing
325///
326/// # Examples
327///
328/// ## Basic Cypher Query
329///
330/// ```yaml
331/// queries:
332/// - id: active_orders
333/// query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
334/// queryLanguage: Cypher # Optional, this is default
335/// sources: [orders_db]
336/// auto_start: true
337/// enableBootstrap: true
338/// bootstrapBufferSize: 10000
339/// ```
340///
341/// ## Query with Multiple Sources
342///
343/// ```yaml
344/// queries:
345/// - id: order_customer_join
346/// query: |
347/// MATCH (o:Order)-[:BELONGS_TO]->(c:Customer)
348/// WHERE o.status = 'active'
349/// RETURN o, c
350/// sources: [orders_db, customers_db]
351/// ```
352///
353/// ## Query with Synthetic Joins
354///
355/// ```yaml
356/// queries:
357/// - id: synthetic_join_query
358/// query: |
359/// MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
360/// RETURN o.id, c.name
361/// sources: [orders_db, customers_db]
362/// joins:
363/// - id: CUSTOMER # Relationship type in query
364/// keys:
365/// - label: Order
366/// property: customer_id
367/// - label: Customer
368/// property: id
369/// ```
370///
371/// ## High-Throughput Query
372///
373/// ```yaml
374/// queries:
375/// - id: high_volume_processing
376/// query: "MATCH (n:Event) WHERE n.timestamp > timestamp() - 60000 RETURN n"
377/// sources: [event_stream]
378/// priority_queue_capacity: 100000 # Large queue for many out-of-order events
379/// dispatch_buffer_capacity: 10000 # Large output buffer
380/// bootstrapBufferSize: 50000 # Large bootstrap buffer
381/// ```
382///
383/// ## GQL Query
384///
385/// ```yaml
386/// queries:
387/// - id: gql_users
388/// query: |
389/// {
390/// users(status: "active") {
391/// id
392/// name
393/// email
394/// }
395/// }
396/// queryLanguage: GQL
397/// sources: [users_db]
398/// ```
399#[derive(Debug, Clone, Serialize, Deserialize)]
400pub struct QueryConfig {
401 /// Unique identifier for the query
402 pub id: String,
403 /// Query string (Cypher or GQL depending on query_language)
404 pub query: String,
405 /// Query language to use (default: Cypher)
406 #[serde(default, rename = "queryLanguage")]
407 pub query_language: QueryLanguage,
408 /// Middleware configurations for this query
409 #[serde(default)]
410 pub middleware: Vec<SourceMiddlewareConfig>,
411 /// Source subscriptions with optional middleware pipelines
412 #[serde(default)]
413 pub sources: Vec<SourceSubscriptionConfig>,
414 /// Whether to automatically start this query (default: true)
415 #[serde(default = "default_auto_start")]
416 pub auto_start: bool,
417 /// Optional synthetic joins for the query
418 #[serde(skip_serializing_if = "Option::is_none")]
419 pub joins: Option<Vec<QueryJoinConfig>>,
420 /// Whether to enable bootstrap (default: true)
421 #[serde(default = "default_enable_bootstrap", rename = "enableBootstrap")]
422 pub enable_bootstrap: bool,
423 /// Maximum number of events to buffer during bootstrap (default: 10000)
424 #[serde(
425 default = "default_bootstrap_buffer_size",
426 rename = "bootstrapBufferSize"
427 )]
428 pub bootstrap_buffer_size: usize,
429 /// Priority queue capacity for this query (default: server global, or 10000 if not specified)
430 #[serde(default, skip_serializing_if = "Option::is_none")]
431 pub priority_queue_capacity: Option<usize>,
432 /// Dispatch buffer capacity for this query (default: server global, or 1000 if not specified)
433 #[serde(default, skip_serializing_if = "Option::is_none")]
434 pub dispatch_buffer_capacity: Option<usize>,
435 /// Dispatch mode for this query (default: Channel)
436 #[serde(default, skip_serializing_if = "Option::is_none")]
437 pub dispatch_mode: Option<DispatchMode>,
438 /// Storage backend for this query (default: in-memory)
439 /// Can reference a named backend or provide inline configuration
440 #[serde(skip_serializing_if = "Option::is_none")]
441 pub storage_backend: Option<StorageBackendRef>,
442 /// Recovery policy when a source cannot honor a requested resume position.
443 /// `None` inherits the global default (itself defaulting to `Strict`).
444 /// See [`RecoveryPolicy`](crate::RecoveryPolicy).
445 #[serde(
446 default,
447 skip_serializing_if = "Option::is_none",
448 rename = "recoveryPolicy"
449 )]
450 pub recovery_policy: Option<RecoveryPolicy>,
451 /// Maximum number of recent QueryResult emissions retained in the outbox ring buffer.
452 /// Reactions use this buffer for gap recovery via `fetch_outbox`.
453 /// Default: 1000. Minimum: 1 (values below 1 are clamped to 1).
454 #[serde(default = "default_outbox_capacity", rename = "outboxCapacity")]
455 pub outbox_capacity: usize,
456 /// Maximum time in seconds that `fetch_snapshot` / `fetch_outbox` will wait for
457 /// the query to finish bootstrapping before returning `FetchError::TimedOut`.
458 /// Default: 300 (5 minutes).
459 #[serde(
460 default = "default_bootstrap_timeout_secs",
461 rename = "bootstrapTimeoutSecs"
462 )]
463 pub bootstrap_timeout_secs: u64,
464}
465
466/// Synthetic join configuration for queries
467///
468/// `QueryJoinConfig` defines a virtual relationship between node types from different
469/// sources. This allows queries to join data without requiring physical relationships
470/// in the source systems.
471///
472/// # Join Semantics
473///
474/// Joins create synthetic edges by matching property values across nodes. The `id`
475/// field specifies the relationship type used in the query's MATCH pattern, and `keys`
476/// define which properties to match.
477///
478/// # Examples
479///
480/// ## Simple Join on Single Property
481///
482/// ```yaml
483/// joins:
484/// - id: CUSTOMER # Use in query: MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
485/// keys:
486/// - label: Order
487/// property: customer_id # Order.customer_id
488/// - label: Customer
489/// property: id # Customer.id
490/// # Creates edge when Order.customer_id == Customer.id
491/// ```
492///
493/// ## Multi-Source Join
494///
495/// ```yaml
496/// joins:
497/// - id: ASSIGNED_TO
498/// keys:
499/// - label: Task
500/// property: assignee_id
501/// - label: User
502/// property: user_id
503/// ```
504#[derive(Debug, Clone, Serialize, Deserialize)]
505pub struct QueryJoinConfig {
506 /// Unique identifier for the join (should match relationship type in query)
507 pub id: String,
508 /// Keys defining the join relationship
509 pub keys: Vec<QueryJoinKeyConfig>,
510}
511
512/// Join key specification for synthetic joins
513///
514/// `QueryJoinKeyConfig` specifies one side of a join condition by identifying
515/// a node label and the property to use for matching.
516///
517/// # Example
518///
519/// For joining orders to customers:
520///
521/// ```yaml
522/// keys:
523/// - label: Order
524/// property: customer_id
525/// - label: Customer
526/// property: id
527/// ```
528///
529/// This creates an edge when `Order.customer_id == Customer.id`.
530#[derive(Debug, Clone, Serialize, Deserialize)]
531pub struct QueryJoinKeyConfig {
532 /// Node label to match
533 pub label: String,
534 /// Property to use for joining
535 pub property: String,
536}
537
538impl DrasiLibConfig {
539 /// Validate configuration consistency and references
540 ///
541 /// Performs comprehensive validation checks:
542 /// - Ensures all query IDs are unique
543 /// - Validates storage backend configurations
544 ///
545 /// Note: Source and reaction validation happens at runtime when instances are added,
546 /// since drasi-lib has no knowledge of specific plugin configurations.
547 ///
548 /// # Errors
549 ///
550 /// Returns error if validation fails with a description of the problem.
551 pub fn validate(&self) -> Result<()> {
552 // Validate unique query ids
553 let mut query_ids = std::collections::HashSet::new();
554 for query in &self.queries {
555 if !query_ids.insert(&query.id) {
556 return Err(anyhow::anyhow!("Duplicate query id: '{}'", query.id));
557 }
558 }
559
560 // Validate unique storage backend ids
561 let mut storage_backend_ids = std::collections::HashSet::new();
562 for backend in &self.storage_backends {
563 if !storage_backend_ids.insert(&backend.id) {
564 return Err(anyhow::anyhow!(
565 "Duplicate storage backend id: '{}'",
566 backend.id
567 ));
568 }
569 // Validate backend configuration
570 backend.spec.validate().map_err(|e| {
571 anyhow::anyhow!(
572 "Storage backend '{}' has invalid configuration: {}",
573 backend.id,
574 e
575 )
576 })?;
577 }
578
579 // Validate storage backend references in queries
580 for query in &self.queries {
581 if let Some(backend_ref) = &query.storage_backend {
582 match backend_ref {
583 StorageBackendRef::Named(backend_id) => {
584 if !storage_backend_ids.contains(backend_id) {
585 return Err(anyhow::anyhow!(
586 "Query '{}' references unknown storage backend: '{}'",
587 query.id,
588 backend_id
589 ));
590 }
591 }
592 StorageBackendRef::Inline(spec) => {
593 // Validate inline backend configuration
594 spec.validate().map_err(|e| {
595 anyhow::anyhow!(
596 "Query '{}' has invalid inline storage backend configuration: {}",
597 query.id,
598 e
599 )
600 })?;
601 }
602 }
603 }
604 }
605
606 Ok(())
607 }
608}
609
610fn default_id() -> String {
611 uuid::Uuid::new_v4().to_string()
612}
613
614fn default_auto_start() -> bool {
615 true
616}
617
618fn default_enable_bootstrap() -> bool {
619 true
620}
621
622fn default_bootstrap_buffer_size() -> usize {
623 10000
624}
625
626fn default_outbox_capacity() -> usize {
627 crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY
628}
629
630fn default_bootstrap_timeout_secs() -> u64 {
631 300
632}
633
634// Conversion implementations for QueryJoin types
635impl From<QueryJoinKeyConfig> for drasi_core::models::QueryJoinKey {
636 fn from(config: QueryJoinKeyConfig) -> Self {
637 drasi_core::models::QueryJoinKey {
638 label: config.label,
639 property: config.property,
640 }
641 }
642}
643
644impl From<QueryJoinConfig> for drasi_core::models::QueryJoin {
645 fn from(config: QueryJoinConfig) -> Self {
646 drasi_core::models::QueryJoin {
647 id: config.id,
648 keys: config.keys.into_iter().map(|k| k.into()).collect(),
649 }
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656 use std::collections::BTreeSet;
657
658 #[test]
659 fn btreeset_sources_maintains_sorted_order_and_deduplication() {
660 let mut graph = GraphSchema::default();
661 let schema = SourceSchema {
662 nodes: vec![NodeSchema {
663 label: "Item".to_string(),
664 properties: Vec::new(),
665 }],
666 relations: Vec::new(),
667 };
668
669 graph.merge_source_schema("charlie", &schema);
670 graph.merge_source_schema("alpha", &schema);
671 graph.merge_source_schema("bravo", &schema);
672 graph.merge_source_schema("alpha", &schema); // duplicate
673
674 let item = graph.nodes.get("Item").unwrap();
675 let sources: Vec<_> = item.sources.iter().cloned().collect();
676 assert_eq!(sources, vec!["alpha", "bravo", "charlie"]);
677 }
678
679 #[test]
680 fn normalize_table_label_strips_schema_prefix() {
681 assert_eq!(normalize_table_label("public.users"), "users");
682 assert_eq!(normalize_table_label("dbo.orders"), "orders");
683 assert_eq!(normalize_table_label("customers"), "customers");
684 }
685
686 #[test]
687 fn merge_source_schema_combines_properties() {
688 let mut graph = GraphSchema::default();
689
690 let schema_a = SourceSchema {
691 nodes: vec![NodeSchema {
692 label: "Sensor".to_string(),
693 properties: vec![PropertySchema::new("temperature")],
694 }],
695 relations: Vec::new(),
696 };
697 let schema_b = SourceSchema {
698 nodes: vec![NodeSchema {
699 label: "Sensor".to_string(),
700 properties: vec![
701 PropertySchema {
702 name: "temperature".to_string(),
703 data_type: Some(PropertyType::Float),
704 description: None,
705 },
706 PropertySchema::new("humidity"),
707 ],
708 }],
709 relations: Vec::new(),
710 };
711
712 graph.merge_source_schema("src-a", &schema_a);
713 graph.merge_source_schema("src-b", &schema_b);
714
715 let sensor = graph.nodes.get("Sensor").unwrap();
716 assert_eq!(
717 sensor.sources,
718 BTreeSet::from(["src-a".to_string(), "src-b".to_string()])
719 );
720 assert_eq!(sensor.properties.len(), 2);
721
722 // temperature should have gotten the type from src-b
723 let temp = sensor
724 .properties
725 .iter()
726 .find(|p| p.name == "temperature")
727 .unwrap();
728 assert_eq!(temp.data_type, Some(PropertyType::Float));
729 }
730}