1use crate::prelude::*;
14
15use super::schema::{EnrichedValueType, FieldSchema};
16use serde::{Deserialize, Serialize};
17use std::fmt;
18use std::ops::Deref;
19
20#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22#[serde(rename_all = "lowercase")]
23pub enum OutputMode {
24 Concise,
25 Verbose,
26}
27
28pub trait SpecFormatter {
30 fn format(&self, mode: OutputMode) -> String;
31}
32
33pub type ScopeName = String;
34
35pub type FieldName = String;
40
41pub const ROOT_SCOPE_NAME: &str = "_root";
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
44pub struct FieldPath(pub Vec<FieldName>);
45
46impl Deref for FieldPath {
47 type Target = Vec<FieldName>;
48
49 fn deref(&self) -> &Self::Target {
50 &self.0
51 }
52}
53
54impl fmt::Display for FieldPath {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 if self.is_empty() {
57 write!(f, "*")
58 } else {
59 write!(f, "{}", self.join("."))
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
67pub struct OpArgName(pub Option<String>);
68
69impl fmt::Display for OpArgName {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 if let Some(arg_name) = &self.0 {
72 write!(f, "${arg_name}")
73 } else {
74 write!(f, "?")
75 }
76 }
77}
78
79impl OpArgName {
80 pub fn is_unnamed(&self) -> bool {
81 self.0.is_none()
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
86pub struct NamedSpec<T> {
87 pub name: String,
88
89 #[serde(flatten)]
90 pub spec: T,
91}
92
93impl<T: fmt::Display> fmt::Display for NamedSpec<T> {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 write!(f, "{}: {}", self.name, self.spec)
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct FieldMapping {
101 #[serde(default, skip_serializing_if = "Option::is_none")]
104 pub scope: Option<ScopeName>,
105
106 pub field_path: FieldPath,
107}
108
109impl fmt::Display for FieldMapping {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 let scope = self.scope.as_deref().unwrap_or("");
112 write!(
113 f,
114 "{}{}",
115 if scope.is_empty() {
116 "".to_string()
117 } else {
118 format!("{scope}.")
119 },
120 self.field_path
121 )
122 }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct ConstantMapping {
127 pub schema: EnrichedValueType,
128 pub value: serde_json::Value,
129}
130
131impl fmt::Display for ConstantMapping {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 let value = serde_json::to_string(&self.value).unwrap_or("#serde_error".to_string());
134 write!(f, "{value}")
135 }
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct StructMapping {
140 pub fields: Vec<NamedSpec<ValueMapping>>,
141}
142
143impl fmt::Display for StructMapping {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 let fields = self
146 .fields
147 .iter()
148 .map(|field| field.name.clone())
149 .collect::<Vec<_>>()
150 .join(",");
151 write!(f, "{fields}")
152 }
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156#[serde(tag = "kind")]
157pub enum ValueMapping {
158 Constant(ConstantMapping),
159 Field(FieldMapping),
160 }
162
163impl ValueMapping {
164 pub fn is_entire_scope(&self) -> bool {
165 match self {
166 ValueMapping::Field(FieldMapping {
167 scope: None,
168 field_path,
169 }) => field_path.is_empty(),
170 _ => false,
171 }
172 }
173}
174
175impl std::fmt::Display for ValueMapping {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
177 match self {
178 ValueMapping::Constant(v) => write!(
179 f,
180 "{}",
181 serde_json::to_string(&v.value)
182 .unwrap_or_else(|_| "#(invalid json value)".to_string())
183 ),
184 ValueMapping::Field(v) => {
185 write!(f, "{}.{}", v.scope.as_deref().unwrap_or(""), v.field_path)
186 }
187 }
188 }
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct OpArgBinding {
193 #[serde(default, skip_serializing_if = "OpArgName::is_unnamed")]
194 pub arg_name: OpArgName,
195
196 #[serde(flatten)]
197 pub value: ValueMapping,
198}
199
200impl fmt::Display for OpArgBinding {
201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202 if self.arg_name.is_unnamed() {
203 write!(f, "{}", self.value)
204 } else {
205 write!(f, "{}={}", self.arg_name, self.value)
206 }
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct OpSpec {
212 pub kind: String,
213 #[serde(flatten, default)]
214 pub spec: serde_json::Map<String, serde_json::Value>,
215}
216
217impl SpecFormatter for OpSpec {
218 fn format(&self, mode: OutputMode) -> String {
219 match mode {
220 OutputMode::Concise => self.kind.clone(),
221 OutputMode::Verbose => {
222 let spec_str = serde_json::to_string_pretty(&self.spec)
223 .map(|s| {
224 let lines: Vec<&str> = s.lines().collect();
225 if lines.len() < s.lines().count() {
226 lines
227 .into_iter()
228 .chain(["..."])
229 .collect::<Vec<_>>()
230 .join("\n ")
231 } else {
232 lines.join("\n ")
233 }
234 })
235 .unwrap_or("#serde_error".to_string());
236 format!("{}({})", self.kind, spec_str)
237 }
238 }
239 }
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize, Default)]
243pub struct ExecutionOptions {
244 #[serde(default, skip_serializing_if = "Option::is_none")]
245 pub max_inflight_rows: Option<usize>,
246
247 #[serde(default, skip_serializing_if = "Option::is_none")]
248 pub max_inflight_bytes: Option<usize>,
249
250 #[serde(default, skip_serializing_if = "Option::is_none")]
251 pub timeout: Option<std::time::Duration>,
252}
253
254impl ExecutionOptions {
255 pub fn get_concur_control_options(&self) -> concur_control::Options {
256 concur_control::Options {
257 max_inflight_rows: self.max_inflight_rows,
258 max_inflight_bytes: self.max_inflight_bytes,
259 }
260 }
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, Default)]
264pub struct SourceRefreshOptions {
265 pub refresh_interval: Option<std::time::Duration>,
266}
267
268impl fmt::Display for SourceRefreshOptions {
269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270 let refresh = self
271 .refresh_interval
272 .map(|d| format!("{d:?}"))
273 .unwrap_or("none".to_string());
274 write!(f, "{refresh}")
275 }
276}
277
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct ImportOpSpec {
280 pub source: OpSpec,
281
282 #[serde(default)]
283 pub refresh_options: SourceRefreshOptions,
284
285 #[serde(default)]
286 pub execution_options: ExecutionOptions,
287}
288
289impl SpecFormatter for ImportOpSpec {
290 fn format(&self, mode: OutputMode) -> String {
291 let source = self.source.format(mode);
292 format!("source={}, refresh={}", source, self.refresh_options)
293 }
294}
295
296impl fmt::Display for ImportOpSpec {
297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298 write!(f, "{}", self.format(OutputMode::Concise))
299 }
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize)]
304pub struct TransformOpSpec {
305 pub inputs: Vec<OpArgBinding>,
306 pub op: OpSpec,
307
308 #[serde(default)]
309 pub execution_options: ExecutionOptions,
310}
311
312impl SpecFormatter for TransformOpSpec {
313 fn format(&self, mode: OutputMode) -> String {
314 let inputs = self
315 .inputs
316 .iter()
317 .map(ToString::to_string)
318 .collect::<Vec<_>>()
319 .join(",");
320 let op_str = self.op.format(mode);
321 match mode {
322 OutputMode::Concise => format!("op={op_str}, inputs={inputs}"),
323 OutputMode::Verbose => format!("op={op_str}, inputs=[{inputs}]"),
324 }
325 }
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize)]
330pub struct ForEachOpSpec {
331 pub field_path: FieldPath,
333 pub op_scope: ReactiveOpScope,
334
335 #[serde(default)]
336 pub execution_options: ExecutionOptions,
337}
338
339impl ForEachOpSpec {
340 pub fn get_label(&self) -> String {
341 format!("Loop over {}", self.field_path)
342 }
343}
344
345impl SpecFormatter for ForEachOpSpec {
346 fn format(&self, mode: OutputMode) -> String {
347 match mode {
348 OutputMode::Concise => self.get_label(),
349 OutputMode::Verbose => format!("field={}", self.field_path),
350 }
351 }
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct CollectOpSpec {
357 pub input: StructMapping,
359 pub scope_name: ScopeName,
361 pub collector_name: FieldName,
363 pub auto_uuid_field: Option<FieldName>,
366}
367
368impl SpecFormatter for CollectOpSpec {
369 fn format(&self, mode: OutputMode) -> String {
370 let uuid = self.auto_uuid_field.as_deref().unwrap_or("none");
371 match mode {
372 OutputMode::Concise => {
373 format!(
374 "collector={}, input={}, uuid={}",
375 self.collector_name, self.input, uuid
376 )
377 }
378 OutputMode::Verbose => {
379 format!(
380 "scope={}, collector={}, input=[{}], uuid={}",
381 self.scope_name, self.collector_name, self.input, uuid
382 )
383 }
384 }
385 }
386}
387
388#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
389pub enum VectorSimilarityMetric {
390 CosineSimilarity,
391 L2Distance,
392 InnerProduct,
393}
394
395impl fmt::Display for VectorSimilarityMetric {
396 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397 match self {
398 VectorSimilarityMetric::CosineSimilarity => write!(f, "Cosine"),
399 VectorSimilarityMetric::L2Distance => write!(f, "L2"),
400 VectorSimilarityMetric::InnerProduct => write!(f, "InnerProduct"),
401 }
402 }
403}
404
405#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
406#[serde(tag = "kind")]
407pub enum VectorIndexMethod {
408 Hnsw {
409 #[serde(default, skip_serializing_if = "Option::is_none")]
410 m: Option<u32>,
411 #[serde(default, skip_serializing_if = "Option::is_none")]
412 ef_construction: Option<u32>,
413 },
414 IvfFlat {
415 #[serde(default, skip_serializing_if = "Option::is_none")]
416 lists: Option<u32>,
417 },
418}
419
420impl VectorIndexMethod {
421 pub fn kind(&self) -> &'static str {
422 match self {
423 Self::Hnsw { .. } => "Hnsw",
424 Self::IvfFlat { .. } => "IvfFlat",
425 }
426 }
427}
428
429impl fmt::Display for VectorIndexMethod {
430 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431 match self {
432 Self::Hnsw { m, ef_construction } => {
433 let mut parts = Vec::new();
434 if let Some(m) = m {
435 parts.push(format!("m={}", m));
436 }
437 if let Some(ef) = ef_construction {
438 parts.push(format!("ef_construction={}", ef));
439 }
440 if parts.is_empty() {
441 write!(f, "Hnsw")
442 } else {
443 write!(f, "Hnsw({})", parts.join(","))
444 }
445 }
446 Self::IvfFlat { lists } => {
447 if let Some(lists) = lists {
448 write!(f, "IvfFlat(lists={lists})")
449 } else {
450 write!(f, "IvfFlat")
451 }
452 }
453 }
454 }
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
458pub struct VectorIndexDef {
459 pub field_name: FieldName,
460 pub metric: VectorSimilarityMetric,
461 #[serde(default, skip_serializing_if = "Option::is_none")]
462 pub method: Option<VectorIndexMethod>,
463}
464
465impl fmt::Display for VectorIndexDef {
466 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
467 match &self.method {
468 None => write!(f, "{}:{}", self.field_name, self.metric),
469 Some(method) => write!(f, "{}:{}:{}", self.field_name, self.metric, method),
470 }
471 }
472}
473
474#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
475pub struct FtsIndexDef {
476 pub field_name: FieldName,
477 #[serde(default, skip_serializing_if = "Option::is_none")]
478 pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
479}
480
481impl fmt::Display for FtsIndexDef {
482 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483 match &self.parameters {
484 None => write!(f, "{}", self.field_name),
485 Some(params) => {
486 let params_str = serde_json::to_string(params).unwrap_or_else(|_| "{}".to_string());
487 write!(f, "{}:{}", self.field_name, params_str)
488 }
489 }
490 }
491}
492
493#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
494pub struct IndexOptions {
495 #[serde(default, skip_serializing_if = "Option::is_none")]
496 pub primary_key_fields: Option<Vec<FieldName>>,
497 #[serde(default, skip_serializing_if = "Vec::is_empty")]
498 pub vector_indexes: Vec<VectorIndexDef>,
499 #[serde(default, skip_serializing_if = "Vec::is_empty")]
500 pub fts_indexes: Vec<FtsIndexDef>,
501}
502
503impl IndexOptions {
504 pub fn primary_key_fields(&self) -> Result<&[FieldName]> {
505 Ok(self
506 .primary_key_fields
507 .as_ref()
508 .ok_or(api_error!("Primary key fields are not set"))?
509 .as_ref())
510 }
511}
512
513impl fmt::Display for IndexOptions {
514 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
515 let primary_keys = self
516 .primary_key_fields
517 .as_ref()
518 .map(|p| p.join(","))
519 .unwrap_or_default();
520 let vector_indexes = self
521 .vector_indexes
522 .iter()
523 .map(|v| v.to_string())
524 .collect::<Vec<_>>()
525 .join(",");
526 let fts_indexes = self
527 .fts_indexes
528 .iter()
529 .map(|f| f.to_string())
530 .collect::<Vec<_>>()
531 .join(",");
532 write!(
533 f,
534 "keys={primary_keys}, vector_indexes={vector_indexes}, fts_indexes={fts_indexes}"
535 )
536 }
537}
538
539#[derive(Debug, Clone, Serialize, Deserialize)]
541pub struct ExportOpSpec {
542 pub collector_name: FieldName,
543 pub target: OpSpec,
544
545 #[serde(default, skip_serializing_if = "Vec::is_empty")]
546 pub attachments: Vec<OpSpec>,
547
548 pub index_options: IndexOptions,
549 pub setup_by_user: bool,
550}
551
552impl SpecFormatter for ExportOpSpec {
553 fn format(&self, mode: OutputMode) -> String {
554 let target_str = self.target.format(mode);
555 let base = format!(
556 "collector={}, target={}, {}",
557 self.collector_name, target_str, self.index_options
558 );
559 match mode {
560 OutputMode::Concise => base,
561 OutputMode::Verbose => format!("{}, setup_by_user={}", base, self.setup_by_user),
562 }
563 }
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
568#[serde(tag = "action")]
569pub enum ReactiveOpSpec {
570 Transform(TransformOpSpec),
571 ForEach(ForEachOpSpec),
572 Collect(CollectOpSpec),
573}
574
575impl SpecFormatter for ReactiveOpSpec {
576 fn format(&self, mode: OutputMode) -> String {
577 match self {
578 ReactiveOpSpec::Transform(t) => format!("Transform: {}", t.format(mode)),
579 ReactiveOpSpec::ForEach(fe) => match mode {
580 OutputMode::Concise => fe.get_label().to_string(),
581 OutputMode::Verbose => format!("ForEach: {}", fe.format(mode)),
582 },
583 ReactiveOpSpec::Collect(c) => format!("Collect: {}", c.format(mode)),
584 }
585 }
586}
587
588#[derive(Debug, Clone, Serialize, Deserialize)]
589pub struct ReactiveOpScope {
590 pub name: ScopeName,
591 pub ops: Vec<NamedSpec<ReactiveOpSpec>>,
592 }
594
595impl fmt::Display for ReactiveOpScope {
596 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597 write!(f, "Scope: name={}", self.name)
598 }
599}
600
601#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct FlowInstanceSpec {
604 pub name: String,
606
607 #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
608 pub import_ops: Vec<NamedSpec<ImportOpSpec>>,
609
610 #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
611 pub reactive_ops: Vec<NamedSpec<ReactiveOpSpec>>,
612
613 #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
614 pub export_ops: Vec<NamedSpec<ExportOpSpec>>,
615
616 #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
617 pub declarations: Vec<OpSpec>,
618}
619
620#[derive(Debug, Clone, Serialize, Deserialize)]
621pub struct TransientFlowSpec {
622 pub name: String,
623 pub input_fields: Vec<FieldSchema>,
624 pub reactive_ops: Vec<NamedSpec<ReactiveOpSpec>>,
625 pub output_value: ValueMapping,
626}
627
628impl<T> AuthEntryReference<T> {
629 pub fn new(key: String) -> Self {
630 Self {
631 key,
632 _phantom: std::marker::PhantomData,
633 }
634 }
635}
636pub struct AuthEntryReference<T> {
637 pub key: String,
638 _phantom: std::marker::PhantomData<T>,
639}
640
641impl<T> fmt::Debug for AuthEntryReference<T> {
642 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643 write!(f, "AuthEntryReference({})", self.key)
644 }
645}
646
647impl<T> fmt::Display for AuthEntryReference<T> {
648 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
649 write!(f, "AuthEntryReference({})", self.key)
650 }
651}
652
653impl<T> Clone for AuthEntryReference<T> {
654 fn clone(&self) -> Self {
655 Self::new(self.key.clone())
656 }
657}
658
659#[derive(Serialize, Deserialize)]
660struct UntypedAuthEntryReference<T> {
661 key: T,
662}
663
664impl<T> Serialize for AuthEntryReference<T> {
665 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
666 where
667 S: serde::Serializer,
668 {
669 UntypedAuthEntryReference { key: &self.key }.serialize(serializer)
670 }
671}
672
673impl<'de, T> Deserialize<'de> for AuthEntryReference<T> {
674 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
675 where
676 D: serde::Deserializer<'de>,
677 {
678 let untyped_ref = UntypedAuthEntryReference::<String>::deserialize(deserializer)?;
679 Ok(AuthEntryReference::new(untyped_ref.key))
680 }
681}
682
683impl<T> PartialEq for AuthEntryReference<T> {
684 fn eq(&self, other: &Self) -> bool {
685 self.key == other.key
686 }
687}
688
689impl<T> Eq for AuthEntryReference<T> {}
690
691impl<T> std::hash::Hash for AuthEntryReference<T> {
692 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
693 self.key.hash(state);
694 }
695}