Skip to main content

timeseries_table_datafusion/
ts_table_provider.rs

1#[cfg(test)]
2mod tests;
3mod time_predicate;
4use arrow::datatypes::DataType;
5
6use chrono::FixedOffset;
7
8use chrono_tz::Tz;
9pub(crate) use time_predicate::*;
10
11mod pruning;
12pub(crate) use pruning::*;
13use timeseries_table_core::storage::StorageLocation;
14use timeseries_table_core::storage::file_size;
15
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use arrow::datatypes::SchemaRef;
21use async_trait::async_trait;
22
23use datafusion::catalog::Session;
24use datafusion::catalog::TableProvider;
25use datafusion::common::DFSchema;
26
27use datafusion::datasource::listing::PartitionedFile;
28use datafusion::datasource::physical_plan::FileScanConfigBuilder;
29use datafusion::datasource::physical_plan::ParquetSource;
30use datafusion::datasource::source::DataSourceExec;
31use datafusion::error::{DataFusionError, Result as DFResult};
32use datafusion::execution::object_store::ObjectStoreUrl;
33
34use object_store::path::Path as ObjectStorePath;
35
36use datafusion::logical_expr::Expr;
37
38use datafusion::logical_expr::TableProviderFilterPushDown;
39
40use datafusion::logical_expr::utils::conjunction;
41use datafusion::physical_plan::ExecutionPlan;
42use datafusion::physical_plan::expressions::lit;
43use timeseries_table_core::table::TimeSeriesTable;
44use timeseries_table_core::transaction_log::SegmentMeta;
45use timeseries_table_core::transaction_log::TableState;
46use tokio::sync::RwLock;
47
48/// DataFusion table provider for a timeseries table schema.
49///
50/// The schema is captured when the provider is constructed. If the table schema
51/// evolves, re-register a new provider to pick up the updated schema.
52#[derive(Debug)]
53pub struct TsTableProvider {
54    table: Arc<TimeSeriesTable>,
55    schema: SchemaRef,
56    cache: RwLock<Cache>,
57
58    // Baseline: local filesystem only
59    object_store_url: ObjectStoreUrl,
60}
61
62#[derive(Debug)]
63struct Cache {
64    version: Option<u64>,
65    state: Option<TableState>,
66}
67
68#[derive(Debug, Clone, Copy)]
69pub(crate) enum ParsedTz {
70    Utc,
71    Fixed(FixedOffset),
72    Olson(Tz),
73}
74
75fn parse_tz(tz: &str) -> Option<ParsedTz> {
76    if tz.eq_ignore_ascii_case("utc") {
77        return Some(ParsedTz::Utc);
78    }
79    if let Ok(offset) = tz.parse::<FixedOffset>() {
80        return Some(ParsedTz::Fixed(offset));
81    }
82    if let Ok(tz) = tz.parse::<Tz>() {
83        return Some(ParsedTz::Olson(tz));
84    }
85    None
86}
87
88/// Wrap a generic error for DataFusion APIs.
89fn df_external<E>(e: E) -> DataFusionError
90where
91    E: std::error::Error + Send + Sync + 'static,
92{
93    DataFusionError::External(Box::new(e))
94}
95
96/// Build a DataFusion execution error from a message.
97fn df_exec(msg: impl Into<String>) -> DataFusionError {
98    DataFusionError::Execution(msg.into())
99}
100
101impl TsTableProvider {
102    /// Creates a new provider backed by the given `TimeSeriesTable`.
103    pub fn try_new(table: Arc<TimeSeriesTable>) -> DFResult<Self> {
104        // Use the table's current in-memory snapshot to get schema.
105        // (No schema evolution in v0.1, so this is stable.)
106        let schema = table
107            .state()
108            .table_meta
109            .arrow_schema_ref()
110            .map_err(df_external)?;
111
112        let object_store_url = ObjectStoreUrl::parse("file://").map_err(df_external)?; // baseline: local FS
113        let state = table.state().clone();
114
115        Ok(Self {
116            table,
117            schema,
118            cache: RwLock::new(Cache {
119                version: Some(state.version),
120                state: Some(state),
121            }),
122            object_store_url,
123        })
124    }
125
126    async fn latest_state(&self) -> DFResult<TableState> {
127        let current_version = self.table.current_version().await.map_err(df_external)?;
128
129        // Fast path: cache hit
130        {
131            let cache = self.cache.read().await;
132            if cache.version == Some(current_version)
133                && let Some(st) = cache.state.clone()
134            {
135                return Ok(st);
136            }
137        }
138
139        // Refresh from log
140        let state = self.table.load_latest_state().await.map_err(df_external)?;
141        let mut cache = self.cache.write().await;
142        cache.version = Some(state.version);
143        cache.state = Some(state.clone());
144        Ok(state)
145    }
146
147    fn segment_abs_path(&self, seg: &SegmentMeta) -> DFResult<PathBuf> {
148        match self.table.location().as_ref() {
149            StorageLocation::Local(root) => Ok(root.join(&seg.path)),
150        }
151    }
152
153    async fn segment_file_size(&self, seg: &SegmentMeta) -> datafusion::error::Result<u64> {
154        if let Some(sz) = seg.file_size {
155            return Ok(sz);
156        }
157
158        let sz = file_size(self.table.location().storage(), Path::new(&seg.path))
159            .await
160            .map_err(|e| {
161                DataFusionError::Execution(format!(
162                    "missing Segment.file_size and failed to stat file: {} ({})",
163                    seg.path, e
164                ))
165            })?;
166        Ok(sz)
167    }
168
169    /// Return the time column name from the table's index spec.
170    fn time_column_name(&self) -> &str {
171        self.table.index_spec().timestamp_column.as_str()
172    }
173
174    fn ts_timezone(&self) -> Option<String> {
175        let ts_col = self.time_column_name();
176        let field = self.schema.field_with_name(ts_col).ok()?;
177        match field.data_type() {
178            DataType::Timestamp(_, Some(tz)) => Some(tz.to_string()),
179            _ => None,
180        }
181    }
182
183    fn prune_segments_by_time<'a>(
184        &self,
185        segments: Vec<&'a SegmentMeta>,
186        filters: &[Expr],
187    ) -> Vec<&'a SegmentMeta> {
188        let ts_col = self.time_column_name();
189        let tz_opt = self.ts_timezone();
190        let parsed_tz = tz_opt.as_deref().and_then(parse_tz);
191
192        let mut saw_any_ts = false;
193        let mut compiled = TimePred::True;
194
195        for f in filters {
196            if expr_mentions_ts(f, ts_col) {
197                saw_any_ts = true;
198                compiled = TimePred::and(compiled, compile_time_pred(f, ts_col, parsed_tz.as_ref()))
199            }
200        }
201
202        if !saw_any_ts {
203            return segments;
204        }
205
206        // Prune only if definitely false for that segment.
207        segments
208            .into_iter()
209            .filter(|seg| {
210                eval_time_pred_on_segment(&compiled, seg.ts_min, seg.ts_max)
211                    != IntervalTruth::AlwaysFalse
212            })
213            .collect()
214    }
215}
216
217#[async_trait]
218impl TableProvider for TsTableProvider {
219    fn as_any(&self) -> &dyn std::any::Any {
220        self
221    }
222
223    fn schema(&self) -> arrow::datatypes::SchemaRef {
224        Arc::clone(&self.schema)
225    }
226
227    fn table_type(&self) -> datafusion::datasource::TableType {
228        datafusion::datasource::TableType::Base
229    }
230
231    fn supports_filters_pushdown(
232        &self,
233        filters: &[&Expr],
234    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
235        // Inexact: we may prune files, and Parquet may prune row groups/pages,
236        // but DataFusion will still apply the filter for correctness.
237        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
238    }
239
240    async fn scan(
241        &self,
242        state: &dyn Session,
243        projection: Option<&Vec<usize>>,
244        filters: &[Expr], // may include all WHERE predicates
245        limit: Option<usize>,
246    ) -> DFResult<Arc<dyn ExecutionPlan>> {
247        // 1) Get a snapshot (TableState) from core table
248        let snapshot = self.latest_state().await?;
249
250        let segments = snapshot.segments_sorted_by_time();
251
252        let df_schema = DFSchema::try_from(self.schema().as_ref().clone())?;
253        let predicate = conjunction(filters.to_vec());
254        let predicate = predicate
255            .map(|p| state.create_physical_expr(p, &df_schema))
256            .transpose()?
257            .unwrap_or_else(|| lit(true));
258
259        // Build Parquet scan plan (DataSourceExec + ParquetSource)
260        let parquet_source = Arc::new(ParquetSource::default().with_predicate(predicate));
261
262        let mut builder = FileScanConfigBuilder::new(
263            self.object_store_url.clone(),
264            self.schema.clone(),
265            parquet_source,
266        )
267        .with_projection_indices(projection.cloned())
268        .with_limit(limit);
269
270        let selected = self.prune_segments_by_time(segments, filters);
271        for seg in selected {
272            let abs = self.segment_abs_path(seg)?;
273            let abs = std::path::absolute(&abs).map_err(|e| {
274                df_exec(format!(
275                    "failed to make segment path absolute {}: {}",
276                    abs.display(),
277                    e
278                ))
279            })?;
280
281            let file_size = self.segment_file_size(seg).await?;
282
283            // PartitionedFile expects an object_store Path string delimited by `/` (file URI
284            // semantics). Convert from the platform-native filesystem path to avoid Windows
285            // path quirks (e.g. `\\?\` canonicalization prefixes).
286            let location = ObjectStorePath::from_absolute_path(&abs).map_err(df_external)?;
287            let pf = PartitionedFile::new(location.as_ref(), file_size);
288
289            builder = builder.with_file(pf);
290        }
291
292        // Produce the execution plan
293        let plan = DataSourceExec::from_data_source(builder.build());
294        Ok(plan)
295    }
296}