timeseries_table_datafusion/
ts_table_provider.rs1#[cfg(test)]
2mod tests;
3mod time_predicate;
4use arrow::datatypes::DataType;
5
6use chrono::FixedOffset;
7
8use chrono_tz::Tz;
9pub(crate) use time_predicate::*;
10
11mod pruning;
12pub(crate) use pruning::*;
13use timeseries_table_core::storage::StorageLocation;
14use timeseries_table_core::storage::file_size;
15
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use arrow::datatypes::SchemaRef;
21use async_trait::async_trait;
22
23use datafusion::catalog::Session;
24use datafusion::catalog::TableProvider;
25use datafusion::common::DFSchema;
26
27use datafusion::datasource::listing::PartitionedFile;
28use datafusion::datasource::physical_plan::FileScanConfigBuilder;
29use datafusion::datasource::physical_plan::ParquetSource;
30use datafusion::datasource::source::DataSourceExec;
31use datafusion::error::{DataFusionError, Result as DFResult};
32use datafusion::execution::object_store::ObjectStoreUrl;
33
34use object_store::path::Path as ObjectStorePath;
35
36use datafusion::logical_expr::Expr;
37
38use datafusion::logical_expr::TableProviderFilterPushDown;
39
40use datafusion::logical_expr::utils::conjunction;
41use datafusion::physical_plan::ExecutionPlan;
42use datafusion::physical_plan::expressions::lit;
43use timeseries_table_core::table::TimeSeriesTable;
44use timeseries_table_core::transaction_log::SegmentMeta;
45use timeseries_table_core::transaction_log::TableState;
46use tokio::sync::RwLock;
47
48#[derive(Debug)]
53pub struct TsTableProvider {
54 table: Arc<TimeSeriesTable>,
55 schema: SchemaRef,
56 cache: RwLock<Cache>,
57
58 object_store_url: ObjectStoreUrl,
60}
61
62#[derive(Debug)]
63struct Cache {
64 version: Option<u64>,
65 state: Option<TableState>,
66}
67
68#[derive(Debug, Clone, Copy)]
69pub(crate) enum ParsedTz {
70 Utc,
71 Fixed(FixedOffset),
72 Olson(Tz),
73}
74
75fn parse_tz(tz: &str) -> Option<ParsedTz> {
76 if tz.eq_ignore_ascii_case("utc") {
77 return Some(ParsedTz::Utc);
78 }
79 if let Ok(offset) = tz.parse::<FixedOffset>() {
80 return Some(ParsedTz::Fixed(offset));
81 }
82 if let Ok(tz) = tz.parse::<Tz>() {
83 return Some(ParsedTz::Olson(tz));
84 }
85 None
86}
87
88fn df_external<E>(e: E) -> DataFusionError
90where
91 E: std::error::Error + Send + Sync + 'static,
92{
93 DataFusionError::External(Box::new(e))
94}
95
96fn df_exec(msg: impl Into<String>) -> DataFusionError {
98 DataFusionError::Execution(msg.into())
99}
100
101impl TsTableProvider {
102 pub fn try_new(table: Arc<TimeSeriesTable>) -> DFResult<Self> {
104 let schema = table
107 .state()
108 .table_meta
109 .arrow_schema_ref()
110 .map_err(df_external)?;
111
112 let object_store_url = ObjectStoreUrl::parse("file://").map_err(df_external)?; let state = table.state().clone();
114
115 Ok(Self {
116 table,
117 schema,
118 cache: RwLock::new(Cache {
119 version: Some(state.version),
120 state: Some(state),
121 }),
122 object_store_url,
123 })
124 }
125
126 async fn latest_state(&self) -> DFResult<TableState> {
127 let current_version = self.table.current_version().await.map_err(df_external)?;
128
129 {
131 let cache = self.cache.read().await;
132 if cache.version == Some(current_version)
133 && let Some(st) = cache.state.clone()
134 {
135 return Ok(st);
136 }
137 }
138
139 let state = self.table.load_latest_state().await.map_err(df_external)?;
141 let mut cache = self.cache.write().await;
142 cache.version = Some(state.version);
143 cache.state = Some(state.clone());
144 Ok(state)
145 }
146
147 fn segment_abs_path(&self, seg: &SegmentMeta) -> DFResult<PathBuf> {
148 match self.table.location().as_ref() {
149 StorageLocation::Local(root) => Ok(root.join(&seg.path)),
150 }
151 }
152
153 async fn segment_file_size(&self, seg: &SegmentMeta) -> datafusion::error::Result<u64> {
154 if let Some(sz) = seg.file_size {
155 return Ok(sz);
156 }
157
158 let sz = file_size(self.table.location().storage(), Path::new(&seg.path))
159 .await
160 .map_err(|e| {
161 DataFusionError::Execution(format!(
162 "missing Segment.file_size and failed to stat file: {} ({})",
163 seg.path, e
164 ))
165 })?;
166 Ok(sz)
167 }
168
169 fn time_column_name(&self) -> &str {
171 self.table.index_spec().timestamp_column.as_str()
172 }
173
174 fn ts_timezone(&self) -> Option<String> {
175 let ts_col = self.time_column_name();
176 let field = self.schema.field_with_name(ts_col).ok()?;
177 match field.data_type() {
178 DataType::Timestamp(_, Some(tz)) => Some(tz.to_string()),
179 _ => None,
180 }
181 }
182
183 fn prune_segments_by_time<'a>(
184 &self,
185 segments: Vec<&'a SegmentMeta>,
186 filters: &[Expr],
187 ) -> Vec<&'a SegmentMeta> {
188 let ts_col = self.time_column_name();
189 let tz_opt = self.ts_timezone();
190 let parsed_tz = tz_opt.as_deref().and_then(parse_tz);
191
192 let mut saw_any_ts = false;
193 let mut compiled = TimePred::True;
194
195 for f in filters {
196 if expr_mentions_ts(f, ts_col) {
197 saw_any_ts = true;
198 compiled = TimePred::and(compiled, compile_time_pred(f, ts_col, parsed_tz.as_ref()))
199 }
200 }
201
202 if !saw_any_ts {
203 return segments;
204 }
205
206 segments
208 .into_iter()
209 .filter(|seg| {
210 eval_time_pred_on_segment(&compiled, seg.ts_min, seg.ts_max)
211 != IntervalTruth::AlwaysFalse
212 })
213 .collect()
214 }
215}
216
217#[async_trait]
218impl TableProvider for TsTableProvider {
219 fn as_any(&self) -> &dyn std::any::Any {
220 self
221 }
222
223 fn schema(&self) -> arrow::datatypes::SchemaRef {
224 Arc::clone(&self.schema)
225 }
226
227 fn table_type(&self) -> datafusion::datasource::TableType {
228 datafusion::datasource::TableType::Base
229 }
230
231 fn supports_filters_pushdown(
232 &self,
233 filters: &[&Expr],
234 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
235 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
238 }
239
240 async fn scan(
241 &self,
242 state: &dyn Session,
243 projection: Option<&Vec<usize>>,
244 filters: &[Expr], limit: Option<usize>,
246 ) -> DFResult<Arc<dyn ExecutionPlan>> {
247 let snapshot = self.latest_state().await?;
249
250 let segments = snapshot.segments_sorted_by_time();
251
252 let df_schema = DFSchema::try_from(self.schema().as_ref().clone())?;
253 let predicate = conjunction(filters.to_vec());
254 let predicate = predicate
255 .map(|p| state.create_physical_expr(p, &df_schema))
256 .transpose()?
257 .unwrap_or_else(|| lit(true));
258
259 let parquet_source = Arc::new(ParquetSource::default().with_predicate(predicate));
261
262 let mut builder = FileScanConfigBuilder::new(
263 self.object_store_url.clone(),
264 self.schema.clone(),
265 parquet_source,
266 )
267 .with_projection_indices(projection.cloned())
268 .with_limit(limit);
269
270 let selected = self.prune_segments_by_time(segments, filters);
271 for seg in selected {
272 let abs = self.segment_abs_path(seg)?;
273 let abs = std::path::absolute(&abs).map_err(|e| {
274 df_exec(format!(
275 "failed to make segment path absolute {}: {}",
276 abs.display(),
277 e
278 ))
279 })?;
280
281 let file_size = self.segment_file_size(seg).await?;
282
283 let location = ObjectStorePath::from_absolute_path(&abs).map_err(df_external)?;
287 let pf = PartitionedFile::new(location.as_ref(), file_size);
288
289 builder = builder.with_file(pf);
290 }
291
292 let plan = DataSourceExec::from_data_source(builder.build());
294 Ok(plan)
295 }
296}