1use crate::{
14 execution::source_indexer::{ProcessSourceRowInput, SourceIndexingContext},
15 prelude::*,
16};
17
18use super::stats;
19use futures::future::try_join_all;
20use sqlx::PgPool;
21use std::fmt::Write;
22use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior};
23use tracing::Level;
24
25pub struct FlowLiveUpdaterUpdates {
26 pub active_sources: Vec<String>,
27 pub updated_sources: Vec<String>,
28}
29struct FlowLiveUpdaterStatus {
30 pub active_source_idx: BTreeSet<usize>,
31 pub source_updates_num: Vec<usize>,
32}
33
34struct UpdateReceiveState {
35 status_rx: watch::Receiver<FlowLiveUpdaterStatus>,
36 last_num_source_updates: Vec<usize>,
37 is_done: bool,
38}
39
40pub struct FlowLiveUpdater {
41 flow_ctx: Arc<FlowContext>,
42 join_set: Mutex<Option<JoinSet<Result<()>>>>,
43 stats_per_task: Vec<Arc<stats::UpdateStats>>,
44 pub operation_in_process_stats: Arc<stats::OperationInProcessStats>,
46 recv_state: tokio::sync::Mutex<UpdateReceiveState>,
47 num_remaining_tasks_rx: watch::Receiver<usize>,
48
49 _status_tx: watch::Sender<FlowLiveUpdaterStatus>,
51 _num_remaining_tasks_tx: watch::Sender<usize>,
52}
53
54#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct FlowLiveUpdaterOptions {
56 pub live_mode: bool,
59
60 pub reexport_targets: bool,
62
63 pub full_reprocess: bool,
65
66 pub print_stats: bool,
68}
69
70const TRACE_REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
71
72struct SharedAckFn<AckAsyncFn: AsyncFnOnce() -> Result<()>> {
73 count: usize,
74 ack_fn: Option<AckAsyncFn>,
75}
76
77impl<AckAsyncFn: AsyncFnOnce() -> Result<()>> SharedAckFn<AckAsyncFn> {
78 fn new(count: usize, ack_fn: AckAsyncFn) -> Self {
79 Self {
80 count,
81 ack_fn: Some(ack_fn),
82 }
83 }
84
85 async fn ack(v: &Mutex<Self>) -> Result<()> {
86 let ack_fn = {
87 let mut v = v.lock().unwrap();
88 v.count -= 1;
89 if v.count > 0 { None } else { v.ack_fn.take() }
90 };
91 if let Some(ack_fn) = ack_fn {
92 ack_fn().await?;
93 }
94 Ok(())
95 }
96}
97
98struct SourceUpdateTask {
99 source_idx: usize,
100
101 flow: Arc<builder::AnalyzedFlow>,
102 plan: Arc<plan::ExecutionPlan>,
103 execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
104 source_update_stats: Arc<stats::UpdateStats>,
105 operation_in_process_stats: Arc<stats::OperationInProcessStats>,
106 pool: PgPool,
107 options: FlowLiveUpdaterOptions,
108
109 status_tx: watch::Sender<FlowLiveUpdaterStatus>,
110 num_remaining_tasks_tx: watch::Sender<usize>,
111}
112
113impl Drop for SourceUpdateTask {
114 fn drop(&mut self) {
115 self.status_tx.send_modify(|update| {
116 update.active_source_idx.remove(&self.source_idx);
117 });
118 self.num_remaining_tasks_tx.send_modify(|update| {
119 *update -= 1;
120 });
121 }
122}
123
124impl SourceUpdateTask {
125 #[instrument(name = "source_update_task.run", skip_all, fields(flow_name = %self.flow.flow_instance.name, source_name = %self.import_op().name))]
126 async fn run(self) -> Result<()> {
127 let source_indexing_context = self
128 .execution_ctx
129 .get_source_indexing_context(&self.flow, self.source_idx, &self.pool)
130 .await?;
131 let initial_update_options = super::source_indexer::UpdateOptions {
132 expect_little_diff: false,
133 mode: if self.options.full_reprocess {
134 super::source_indexer::UpdateMode::FullReprocess
135 } else if self.options.reexport_targets {
136 super::source_indexer::UpdateMode::ReexportTargets
137 } else {
138 super::source_indexer::UpdateMode::Normal
139 },
140 };
141
142 if !self.options.live_mode {
143 return self
144 .update_one_pass(
145 source_indexing_context,
146 "batch update",
147 initial_update_options,
148 )
149 .await;
150 }
151
152 let mut futs: Vec<BoxFuture<'_, Result<()>>> = Vec::new();
153 let source_idx = self.source_idx;
154 let import_op = self.import_op();
155 let task = &self;
156
157 if let Some(change_stream) = import_op.executor.change_stream().await? {
159 let stats = Arc::new(stats::UpdateStats::default());
160 let stats_to_report = stats.clone();
161
162 let status_tx = self.status_tx.clone();
163 let operation_in_process_stats = self.operation_in_process_stats.clone();
164 let process_change_stream = async move {
165 let mut change_stream = change_stream;
166 let retry_options = retryable::RetryOptions {
167 retry_timeout: None,
168 initial_backoff: std::time::Duration::from_secs(5),
169 max_backoff: std::time::Duration::from_secs(60),
170 };
171 loop {
172 let change_stream = tokio::sync::Mutex::new(&mut change_stream);
175 let change_msg = retryable::run(
176 || async {
177 let mut change_stream = change_stream.lock().await;
178 change_stream
179 .next()
180 .await
181 .transpose()
182 .map_err(retryable::Error::retryable)
183 },
184 &retry_options,
185 )
186 .await
187 .map_err(Error::from)
188 .with_context(|| {
189 format!(
190 "Error in getting change message for flow `{}` source `{}`",
191 task.flow.flow_instance.name, import_op.name
192 )
193 });
194 let change_msg = match change_msg {
195 Ok(Some(change_msg)) => change_msg,
196 Ok(None) => break,
197 Err(err) => {
198 error!("{:?}", err);
199 continue;
200 }
201 };
202
203 let update_stats = Arc::new(stats::UpdateStats::default());
204 let ack_fn = {
205 let status_tx = status_tx.clone();
206 let update_stats = update_stats.clone();
207 let change_stream_stats = stats.clone();
208 async move || {
209 if update_stats.has_any_change() {
210 status_tx.send_modify(|update| {
211 update.source_updates_num[source_idx] += 1;
212 });
213 change_stream_stats.merge(&update_stats);
214 }
215 if let Some(ack_fn) = change_msg.ack_fn {
216 ack_fn().await
217 } else {
218 Ok(())
219 }
220 }
221 };
222 let shared_ack_fn = Arc::new(Mutex::new(SharedAckFn::new(
223 change_msg.changes.iter().len(),
224 ack_fn,
225 )));
226 for change in change_msg.changes {
227 let shared_ack_fn = shared_ack_fn.clone();
228 let concur_permit = import_op
229 .concurrency_controller
230 .acquire(concur_control::BYTES_UNKNOWN_YET)
231 .await?;
232 tokio::spawn(source_indexing_context.clone().process_source_row(
233 ProcessSourceRowInput {
234 key: change.key,
235 key_aux_info: Some(change.key_aux_info),
236 data: change.data,
237 },
238 super::source_indexer::UpdateMode::Normal,
239 update_stats.clone(),
240 Some(operation_in_process_stats.clone()),
241 concur_permit,
242 Some(move || async move { SharedAckFn::ack(&shared_ack_fn).await }),
243 ));
244 }
245 }
246 Ok(())
247 };
248
249 let slf = &self;
250 futs.push(
251 async move {
252 slf.run_with_progress_report(
253 process_change_stream,
254 &stats_to_report,
255 "change stream",
256 None,
257 )
258 .await
259 }
260 .boxed(),
261 );
262 }
263
264 futs.push({
266 async move {
267 let refresh_interval = import_op.refresh_options.refresh_interval;
268
269 task.update_one_pass_with_error_logging(
270 source_indexing_context,
271 if refresh_interval.is_some() {
272 "initial interval update"
273 } else {
274 "batch update"
275 },
276 initial_update_options,
277 )
278 .await;
279
280 let Some(refresh_interval) = refresh_interval else {
281 return Ok(());
282 };
283
284 let mut interval = tokio::time::interval(refresh_interval);
285 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
286
287 interval.tick().await;
289
290 loop {
291 interval.tick().await;
293
294 let mut update_fut = Box::pin(task.update_one_pass_with_error_logging(
295 source_indexing_context,
296 "interval update",
297 super::source_indexer::UpdateOptions {
298 expect_little_diff: true,
299 mode: super::source_indexer::UpdateMode::Normal,
300 },
301 ));
302
303 tokio::select! {
304 biased;
305
306 _ = update_fut.as_mut() => {
307 }
309
310 _ = tokio::time::sleep(refresh_interval) => {
311 warn!(
313 flow_name = %task.flow.flow_instance.name,
314 source_name = %task.import_op().name,
315 update_title = "interval update",
316 refresh_interval_secs = refresh_interval.as_secs_f64(),
317 "Live update pass exceeded refresh_interval; interval updates will lag behind"
318 );
319 update_fut.as_mut().await;
320 }
321 }
322 }
323 }
324 .boxed()
325 });
326
327 try_join_all(futs).await?;
328 Ok(())
329 }
330
331 fn stats_message(
332 &self,
333 stats: &stats::UpdateStats,
334 update_title: &str,
335 start_time: Option<std::time::Instant>,
336 ) -> String {
337 let mut message = format!(
338 "{}.{} ({update_title}):{stats}",
339 self.flow.flow_instance.name,
340 self.import_op().name
341 );
342 if let Some(start_time) = start_time {
343 write!(
344 &mut message,
345 " [elapsed: {:.3}s]",
346 start_time.elapsed().as_secs_f64()
347 )
348 .expect("Failed to write to message");
349 }
350 message
351 }
352
353 fn report_stats(
354 &self,
355 stats: &stats::UpdateStats,
356 update_title: &str,
357 start_time: Option<std::time::Instant>,
358 prefix: &str,
359 ) {
360 if start_time.is_none() && !stats.has_any_change() {
361 return;
362 }
363 if self.options.print_stats {
364 println!(
365 "{prefix}{message}",
366 message = self.stats_message(stats, update_title, start_time)
367 );
368 } else {
369 trace!(
370 "{prefix}{message}",
371 message = self.stats_message(stats, update_title, start_time)
372 );
373 }
374 }
375
376 fn stats_report_enabled(&self) -> bool {
377 self.options.print_stats || tracing::event_enabled!(Level::TRACE)
378 }
379
380 async fn run_with_progress_report(
381 &self,
382 fut: impl Future<Output = Result<()>>,
383 stats: &stats::UpdateStats,
384 update_title: &str,
385 start_time: Option<std::time::Instant>,
386 ) -> Result<()> {
387 let interval = if self.stats_report_enabled() {
388 TRACE_REPORT_INTERVAL
389 } else {
390 return fut.await;
391 };
392 let mut pinned_fut = Box::pin(fut);
393 let mut interval = tokio::time::interval(interval);
394
395 let mut report_ready = false;
397 loop {
398 tokio::select! {
399 res = &mut pinned_fut => {
400 return res;
401 }
402 _ = interval.tick() => {
403 if report_ready {
404 self.report_stats(stats, update_title, start_time, "⏳ ");
405 } else {
406 report_ready = true;
407 }
408 }
409 }
410 }
411 }
412
413 async fn update_one_pass(
414 &self,
415 source_indexing_context: &Arc<SourceIndexingContext>,
416 update_title: &str,
417 update_options: super::source_indexer::UpdateOptions,
418 ) -> Result<()> {
419 let start_time = std::time::Instant::now();
420 let update_stats = Arc::new(stats::UpdateStats::default());
421
422 let update_fut = source_indexing_context.update(&update_stats, update_options);
423
424 self.run_with_progress_report(update_fut, &update_stats, update_title, Some(start_time))
425 .await
426 .with_context(|| {
427 format!(
428 "Error in processing flow `{}` source `{}` ({update_title})",
429 self.flow.flow_instance.name,
430 self.import_op().name
431 )
432 })?;
433
434 if update_stats.has_any_change() {
435 self.status_tx.send_modify(|update| {
436 update.source_updates_num[self.source_idx] += 1;
437 });
438 }
439
440 self.report_stats(&update_stats, update_title, Some(start_time), "✅ ");
442 self.source_update_stats.merge(&update_stats);
443 Ok(())
444 }
445
446 async fn update_one_pass_with_error_logging(
447 &self,
448 source_indexing_context: &Arc<SourceIndexingContext>,
449 update_title: &str,
450 update_options: super::source_indexer::UpdateOptions,
451 ) {
452 let result = self
453 .update_one_pass(source_indexing_context, update_title, update_options)
454 .await;
455
456 if let Err(err) = result {
457 error!("{:?}", err);
458 }
459 }
460
461 fn import_op(&self) -> &plan::AnalyzedImportOp {
462 &self.plan.import_ops[self.source_idx]
463 }
464}
465
466impl FlowLiveUpdater {
467 #[instrument(name = "flow_live_updater.start", skip_all, fields(flow_name = %flow_ctx.flow_name()))]
468 pub async fn start(
469 flow_ctx: Arc<FlowContext>,
470 pool: &PgPool,
471 options: FlowLiveUpdaterOptions,
472 ) -> Result<Self> {
473 let plan = flow_ctx.flow.get_execution_plan().await?;
474 let execution_ctx = Arc::new(flow_ctx.use_owned_execution_ctx().await?);
475
476 let (status_tx, status_rx) = watch::channel(FlowLiveUpdaterStatus {
477 active_source_idx: BTreeSet::from_iter(0..plan.import_ops.len()),
478 source_updates_num: vec![0; plan.import_ops.len()],
479 });
480
481 let (num_remaining_tasks_tx, num_remaining_tasks_rx) =
482 watch::channel(plan.import_ops.len());
483
484 let mut join_set = JoinSet::new();
485 let mut stats_per_task = Vec::new();
486 let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default());
487
488 for source_idx in 0..plan.import_ops.len() {
489 let source_update_stats = Arc::new(stats::UpdateStats::default());
490 let source_update_task = SourceUpdateTask {
491 source_idx,
492 flow: flow_ctx.flow.clone(),
493 plan: plan.clone(),
494 execution_ctx: execution_ctx.clone(),
495 source_update_stats: source_update_stats.clone(),
496 operation_in_process_stats: operation_in_process_stats.clone(),
497 pool: pool.clone(),
498 options: options.clone(),
499 status_tx: status_tx.clone(),
500 num_remaining_tasks_tx: num_remaining_tasks_tx.clone(),
501 };
502 join_set.spawn(source_update_task.run());
503 stats_per_task.push(source_update_stats);
504 }
505
506 Ok(Self {
507 flow_ctx,
508 join_set: Mutex::new(Some(join_set)),
509 stats_per_task,
510 operation_in_process_stats,
511 recv_state: tokio::sync::Mutex::new(UpdateReceiveState {
512 status_rx,
513 last_num_source_updates: vec![0; plan.import_ops.len()],
514 is_done: false,
515 }),
516 num_remaining_tasks_rx,
517
518 _status_tx: status_tx,
519 _num_remaining_tasks_tx: num_remaining_tasks_tx,
520 })
521 }
522
523 pub async fn wait(&self) -> Result<()> {
524 {
525 let mut rx = self.num_remaining_tasks_rx.clone();
526 rx.wait_for(|v| *v == 0).await?;
527 }
528
529 let Some(mut join_set) = self.join_set.lock().unwrap().take() else {
530 return Ok(());
531 };
532 while let Some(task_result) = join_set.join_next().await {
533 match task_result {
534 Ok(Ok(_)) => {}
535 Ok(Err(err)) => {
536 return Err(err);
537 }
538 Err(err) if err.is_cancelled() => {}
539 Err(err) => {
540 return Err(err.into());
541 }
542 }
543 }
544 Ok(())
545 }
546
547 pub fn abort(&self) {
548 let mut join_set = self.join_set.lock().unwrap();
549 if let Some(join_set) = &mut *join_set {
550 join_set.abort_all();
551 }
552 }
553
554 pub fn index_update_info(&self) -> stats::IndexUpdateInfo {
555 stats::IndexUpdateInfo {
556 sources: std::iter::zip(
557 self.flow_ctx.flow.flow_instance.import_ops.iter(),
558 self.stats_per_task.iter(),
559 )
560 .map(|(import_op, stats)| stats::SourceUpdateInfo {
561 source_name: import_op.name.clone(),
562 stats: stats.as_ref().clone(),
563 })
564 .collect(),
565 }
566 }
567
568 pub async fn next_status_updates(&self) -> Result<FlowLiveUpdaterUpdates> {
569 let mut recv_state = self.recv_state.lock().await;
570 let recv_state = &mut *recv_state;
571
572 if recv_state.is_done {
573 return Ok(FlowLiveUpdaterUpdates {
574 active_sources: vec![],
575 updated_sources: vec![],
576 });
577 }
578
579 recv_state.status_rx.changed().await?;
580 let status = recv_state.status_rx.borrow_and_update();
581 let updates = FlowLiveUpdaterUpdates {
582 active_sources: status
583 .active_source_idx
584 .iter()
585 .map(|idx| {
586 self.flow_ctx.flow.flow_instance.import_ops[*idx]
587 .name
588 .clone()
589 })
590 .collect(),
591 updated_sources: status
592 .source_updates_num
593 .iter()
594 .enumerate()
595 .filter_map(|(idx, num_updates)| {
596 if num_updates > &recv_state.last_num_source_updates[idx] {
597 Some(
598 self.flow_ctx.flow.flow_instance.import_ops[idx]
599 .name
600 .clone(),
601 )
602 } else {
603 None
604 }
605 })
606 .collect(),
607 };
608 recv_state.last_num_source_updates = status.source_updates_num.clone();
609 if status.active_source_idx.is_empty() {
610 recv_state.is_done = true;
611 }
612 Ok(updates)
613 }
614}