1use crate::prelude::*;
14
15use super::spec::*;
16use crate::builder::plan::AnalyzedValueMapping;
17
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
19pub struct VectorTypeSchema {
20 pub element_type: Box<BasicValueType>,
21 pub dimension: Option<usize>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25pub struct UnionTypeSchema {
26 pub types: Vec<BasicValueType>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
30#[serde(tag = "kind")]
31pub enum BasicValueType {
32 Bytes,
34
35 Str,
37
38 Bool,
40
41 Int64,
43
44 Float32,
46
47 Float64,
49
50 Range,
52
53 Uuid,
55
56 Date,
58
59 Time,
61
62 LocalDateTime,
64
65 OffsetDateTime,
67
68 TimeDelta,
70
71 Json,
73
74 Vector(VectorTypeSchema),
76
77 Union(UnionTypeSchema),
79}
80
81impl std::fmt::Display for BasicValueType {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 match self {
84 BasicValueType::Bytes => write!(f, "Bytes"),
85 BasicValueType::Str => write!(f, "Str"),
86 BasicValueType::Bool => write!(f, "Bool"),
87 BasicValueType::Int64 => write!(f, "Int64"),
88 BasicValueType::Float32 => write!(f, "Float32"),
89 BasicValueType::Float64 => write!(f, "Float64"),
90 BasicValueType::Range => write!(f, "Range"),
91 BasicValueType::Uuid => write!(f, "Uuid"),
92 BasicValueType::Date => write!(f, "Date"),
93 BasicValueType::Time => write!(f, "Time"),
94 BasicValueType::LocalDateTime => write!(f, "LocalDateTime"),
95 BasicValueType::OffsetDateTime => write!(f, "OffsetDateTime"),
96 BasicValueType::TimeDelta => write!(f, "TimeDelta"),
97 BasicValueType::Json => write!(f, "Json"),
98 BasicValueType::Vector(s) => {
99 write!(f, "Vector[{}", s.element_type)?;
100 if let Some(dimension) = s.dimension {
101 write!(f, ", {dimension}")?;
102 }
103 write!(f, "]")
104 }
105 BasicValueType::Union(s) => {
106 write!(f, "Union[")?;
107 for (i, typ) in s.types.iter().enumerate() {
108 if i > 0 {
109 write!(f, " | ")?;
111 }
112 write!(f, "{typ}")?;
113 }
114 write!(f, "]")
115 }
116 }
117 }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
121pub struct StructSchema {
122 pub fields: Arc<Vec<FieldSchema>>,
123
124 #[serde(default, skip_serializing_if = "Option::is_none")]
125 pub description: Option<Arc<str>>,
126}
127
128impl StructSchema {
129 pub fn without_attrs(&self) -> Self {
130 Self {
131 fields: Arc::new(self.fields.iter().map(|f| f.without_attrs()).collect()),
132 description: None,
133 }
134 }
135}
136
137impl std::fmt::Display for StructSchema {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 write!(f, "Struct(")?;
140 for (i, field) in self.fields.iter().enumerate() {
141 if i > 0 {
142 write!(f, ", ")?;
143 }
144 write!(f, "{field}")?;
145 }
146 write!(f, ")")
147 }
148}
149
150#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
151pub struct KTableInfo {
152 #[serde(default = "default_num_key_parts")]
154 pub num_key_parts: usize,
155}
156
157fn default_num_key_parts() -> usize {
158 1
159}
160
161#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
162#[serde(tag = "kind")]
163#[allow(clippy::enum_variant_names)]
164pub enum TableKind {
165 UTable,
167 #[serde(alias = "Table")]
169 KTable(KTableInfo),
170
171 #[serde(alias = "List")]
173 LTable,
174}
175
176impl std::fmt::Display for TableKind {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 match self {
179 TableKind::UTable => write!(f, "Table"),
180 TableKind::KTable(KTableInfo { num_key_parts }) => write!(f, "KTable({num_key_parts})"),
181 TableKind::LTable => write!(f, "LTable"),
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
187pub struct TableSchema {
188 #[serde(flatten)]
189 pub kind: TableKind,
190
191 pub row: StructSchema,
192}
193
194impl std::fmt::Display for TableSchema {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 write!(f, "{}({})", self.kind, self.row)
197 }
198}
199
200impl TableSchema {
201 pub fn new(kind: TableKind, row: StructSchema) -> Self {
202 Self { kind, row }
203 }
204
205 pub fn has_key(&self) -> bool {
206 !self.key_schema().is_empty()
207 }
208
209 pub fn without_attrs(&self) -> Self {
210 Self {
211 kind: self.kind,
212 row: self.row.without_attrs(),
213 }
214 }
215
216 pub fn key_schema(&self) -> &[FieldSchema] {
217 match self.kind {
218 TableKind::KTable(KTableInfo { num_key_parts: n }) => &self.row.fields[..n],
219 TableKind::UTable | TableKind::LTable => &[],
220 }
221 }
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(tag = "kind")]
226pub enum ValueType {
227 Struct(StructSchema),
228
229 #[serde(untagged)]
230 Basic(BasicValueType),
231
232 #[serde(untagged)]
233 Table(TableSchema),
234}
235
236impl ValueType {
237 pub fn key_schema(&self) -> &[FieldSchema] {
238 match self {
239 ValueType::Basic(_) => &[],
240 ValueType::Struct(_) => &[],
241 ValueType::Table(c) => c.key_schema(),
242 }
243 }
244
245 pub fn without_attrs(&self) -> Self {
247 match self {
248 ValueType::Basic(a) => ValueType::Basic(a.clone()),
249 ValueType::Struct(a) => ValueType::Struct(a.without_attrs()),
250 ValueType::Table(a) => ValueType::Table(a.without_attrs()),
251 }
252 }
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
256pub struct EnrichedValueType<DataType = ValueType> {
257 #[serde(rename = "type")]
258 pub typ: DataType,
259
260 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
261 pub nullable: bool,
262
263 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
264 pub attrs: Arc<BTreeMap<String, serde_json::Value>>,
265}
266
267impl EnrichedValueType {
268 pub fn without_attrs(&self) -> Self {
269 Self {
270 typ: self.typ.without_attrs(),
271 nullable: self.nullable,
272 attrs: Default::default(),
273 }
274 }
275
276 pub fn with_nullable(mut self, nullable: bool) -> Self {
277 self.nullable = nullable;
278 self
279 }
280}
281
282impl<DataType> EnrichedValueType<DataType> {
283 pub fn from_alternative<AltDataType>(
284 value_type: &EnrichedValueType<AltDataType>,
285 ) -> Result<Self>
286 where
287 for<'a> &'a AltDataType: TryInto<DataType, Error = Error>,
288 {
289 Ok(Self {
290 typ: (&value_type.typ).try_into()?,
291 nullable: value_type.nullable,
292 attrs: value_type.attrs.clone(),
293 })
294 }
295
296 pub fn with_attr(mut self, key: &str, value: serde_json::Value) -> Self {
297 Arc::make_mut(&mut self.attrs).insert(key.to_string(), value);
298 self
299 }
300}
301
302impl std::fmt::Display for EnrichedValueType {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 write!(f, "{}", self.typ)?;
305 if self.nullable {
306 write!(f, "?")?;
307 }
308 if !self.attrs.is_empty() {
309 write!(
310 f,
311 " [{}]",
312 self.attrs
313 .iter()
314 .map(|(k, v)| format!("{k}: {v}"))
315 .collect::<Vec<_>>()
316 .join(", ")
317 )?;
318 }
319 Ok(())
320 }
321}
322
323impl std::fmt::Display for ValueType {
324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325 match self {
326 ValueType::Basic(b) => write!(f, "{b}"),
327 ValueType::Struct(s) => write!(f, "{s}"),
328 ValueType::Table(c) => write!(f, "{c}"),
329 }
330 }
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
334pub struct FieldSchema<DataType = ValueType> {
335 pub name: FieldName,
337
338 #[serde(flatten)]
339 pub value_type: EnrichedValueType<DataType>,
340
341 #[serde(default, skip_serializing_if = "Option::is_none")]
343 pub description: Option<Arc<str>>,
344}
345
346impl FieldSchema {
347 pub fn new(name: impl ToString, value_type: EnrichedValueType) -> Self {
348 Self {
349 name: name.to_string(),
350 value_type,
351 description: None,
352 }
353 }
354
355 pub fn new_with_description(
356 name: impl ToString,
357 value_type: EnrichedValueType,
358 description: Option<impl ToString>,
359 ) -> Self {
360 Self {
361 name: name.to_string(),
362 value_type,
363 description: description.map(|d| d.to_string().into()),
364 }
365 }
366
367 pub fn without_attrs(&self) -> Self {
368 Self {
369 name: self.name.clone(),
370 value_type: self.value_type.without_attrs(),
371 description: None,
372 }
373 }
374}
375
376impl<DataType> FieldSchema<DataType> {
377 pub fn from_alternative<AltDataType>(field: &FieldSchema<AltDataType>) -> Result<Self>
378 where
379 for<'a> &'a AltDataType: TryInto<DataType, Error = Error>,
380 {
381 Ok(Self {
382 name: field.name.clone(),
383 value_type: EnrichedValueType::from_alternative(&field.value_type)?,
384 description: field.description.clone(),
385 })
386 }
387}
388
389impl std::fmt::Display for FieldSchema {
390 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
391 write!(f, "{}: {}", self.name, self.value_type)
392 }
393}
394
395#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
396pub struct CollectorSchema {
397 pub fields: Vec<FieldSchema>,
398 pub auto_uuid_field_idx: Option<usize>,
400}
401
402impl std::fmt::Display for CollectorSchema {
403 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
404 write!(f, "Collector(")?;
405 for (i, field) in self.fields.iter().enumerate() {
406 if i > 0 {
407 write!(f, ", ")?;
408 }
409 write!(f, "{field}")?;
410 }
411 write!(f, ")")
412 }
413}
414
415impl CollectorSchema {
416 pub fn from_fields(fields: Vec<FieldSchema>, auto_uuid_field: Option<FieldName>) -> Self {
417 let mut fields = fields;
418 let auto_uuid_field_idx = if let Some(auto_uuid_field) = auto_uuid_field {
419 fields.insert(
420 0,
421 FieldSchema::new(
422 auto_uuid_field,
423 EnrichedValueType {
424 typ: ValueType::Basic(BasicValueType::Uuid),
425 nullable: false,
426 attrs: Default::default(),
427 },
428 ),
429 );
430 Some(0)
431 } else {
432 None
433 };
434 Self {
435 fields,
436 auto_uuid_field_idx,
437 }
438 }
439 pub fn without_attrs(&self) -> Self {
440 Self {
441 fields: self.fields.iter().map(|f| f.without_attrs()).collect(),
442 auto_uuid_field_idx: self.auto_uuid_field_idx,
443 }
444 }
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize, Default)]
448pub struct OpScopeSchema {
449 pub op_output_types: HashMap<FieldName, EnrichedValueType>,
451
452 pub op_scopes: HashMap<String, Arc<OpScopeSchema>>,
454
455 pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
457}
458
459#[derive(Debug, Clone, Serialize, Deserialize)]
461pub struct FlowSchema {
462 pub schema: StructSchema,
463
464 pub root_op_scope: OpScopeSchema,
465}
466
467impl std::ops::Deref for FlowSchema {
468 type Target = StructSchema;
469
470 fn deref(&self) -> &Self::Target {
471 &self.schema
472 }
473}
474
475pub struct OpArgSchema {
476 pub name: OpArgName,
477 pub value_type: EnrichedValueType,
478 pub analyzed_value: AnalyzedValueMapping,
479}
480
481pub trait TypeCore {
483 fn into_type(self) -> ValueType;
484}
485
486impl TypeCore for BasicValueType {
487 fn into_type(self) -> ValueType {
488 ValueType::Basic(self)
489 }
490}
491
492impl TypeCore for StructSchema {
493 fn into_type(self) -> ValueType {
494 ValueType::Struct(self)
495 }
496}
497
498impl TypeCore for TableSchema {
499 fn into_type(self) -> ValueType {
500 ValueType::Table(self)
501 }
502}
503
504pub fn make_output_type<Type: TypeCore>(value_type: Type) -> EnrichedValueType {
505 EnrichedValueType {
506 typ: value_type.into_type(),
507 attrs: Default::default(),
508 nullable: false,
509 }
510}