Skip to main content

recoco_core/execution/
live_updater.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::{
14    execution::source_indexer::{ProcessSourceRowInput, SourceIndexingContext},
15    prelude::*,
16};
17
18use super::stats;
19use futures::future::try_join_all;
20use sqlx::PgPool;
21use std::fmt::Write;
22use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior};
23use tracing::Level;
24
25pub struct FlowLiveUpdaterUpdates {
26    pub active_sources: Vec<String>,
27    pub updated_sources: Vec<String>,
28}
29struct FlowLiveUpdaterStatus {
30    pub active_source_idx: BTreeSet<usize>,
31    pub source_updates_num: Vec<usize>,
32}
33
34struct UpdateReceiveState {
35    status_rx: watch::Receiver<FlowLiveUpdaterStatus>,
36    last_num_source_updates: Vec<usize>,
37    is_done: bool,
38}
39
40pub struct FlowLiveUpdater {
41    flow_ctx: Arc<FlowContext>,
42    join_set: Mutex<Option<JoinSet<Result<()>>>>,
43    stats_per_task: Vec<Arc<stats::UpdateStats>>,
44    /// Global tracking of in-process rows per operation
45    pub operation_in_process_stats: Arc<stats::OperationInProcessStats>,
46    recv_state: tokio::sync::Mutex<UpdateReceiveState>,
47    num_remaining_tasks_rx: watch::Receiver<usize>,
48
49    // Hold tx to avoid dropping the sender.
50    _status_tx: watch::Sender<FlowLiveUpdaterStatus>,
51    _num_remaining_tasks_tx: watch::Sender<usize>,
52}
53
54#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct FlowLiveUpdaterOptions {
56    /// If true, the updater will keep refreshing the index.
57    /// Otherwise, it will only apply changes from the source up to the current time.
58    pub live_mode: bool,
59
60    /// If true, the updater will reexport the targets even if there's no change.
61    pub reexport_targets: bool,
62
63    /// If true, the updater will reprocess everything and invalidate existing caches.
64    pub full_reprocess: bool,
65
66    /// If true, stats will be printed to the console.
67    pub print_stats: bool,
68}
69
70const TRACE_REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
71
72struct SharedAckFn<AckAsyncFn: AsyncFnOnce() -> Result<()>> {
73    count: usize,
74    ack_fn: Option<AckAsyncFn>,
75}
76
77impl<AckAsyncFn: AsyncFnOnce() -> Result<()>> SharedAckFn<AckAsyncFn> {
78    fn new(count: usize, ack_fn: AckAsyncFn) -> Self {
79        Self {
80            count,
81            ack_fn: Some(ack_fn),
82        }
83    }
84
85    async fn ack(v: &Mutex<Self>) -> Result<()> {
86        let ack_fn = {
87            let mut v = v.lock().unwrap();
88            v.count -= 1;
89            if v.count > 0 { None } else { v.ack_fn.take() }
90        };
91        if let Some(ack_fn) = ack_fn {
92            ack_fn().await?;
93        }
94        Ok(())
95    }
96}
97
98struct SourceUpdateTask {
99    source_idx: usize,
100
101    flow: Arc<builder::AnalyzedFlow>,
102    plan: Arc<plan::ExecutionPlan>,
103    execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
104    source_update_stats: Arc<stats::UpdateStats>,
105    operation_in_process_stats: Arc<stats::OperationInProcessStats>,
106    pool: PgPool,
107    options: FlowLiveUpdaterOptions,
108
109    status_tx: watch::Sender<FlowLiveUpdaterStatus>,
110    num_remaining_tasks_tx: watch::Sender<usize>,
111}
112
113impl Drop for SourceUpdateTask {
114    fn drop(&mut self) {
115        self.status_tx.send_modify(|update| {
116            update.active_source_idx.remove(&self.source_idx);
117        });
118        self.num_remaining_tasks_tx.send_modify(|update| {
119            *update -= 1;
120        });
121    }
122}
123
124impl SourceUpdateTask {
125    #[instrument(name = "source_update_task.run", skip_all, fields(flow_name = %self.flow.flow_instance.name, source_name = %self.import_op().name))]
126    async fn run(self) -> Result<()> {
127        let source_indexing_context = self
128            .execution_ctx
129            .get_source_indexing_context(&self.flow, self.source_idx, &self.pool)
130            .await?;
131        let initial_update_options = super::source_indexer::UpdateOptions {
132            expect_little_diff: false,
133            mode: if self.options.full_reprocess {
134                super::source_indexer::UpdateMode::FullReprocess
135            } else if self.options.reexport_targets {
136                super::source_indexer::UpdateMode::ReexportTargets
137            } else {
138                super::source_indexer::UpdateMode::Normal
139            },
140        };
141
142        if !self.options.live_mode {
143            return self
144                .update_one_pass(
145                    source_indexing_context,
146                    "batch update",
147                    initial_update_options,
148                )
149                .await;
150        }
151
152        let mut futs: Vec<BoxFuture<'_, Result<()>>> = Vec::new();
153        let source_idx = self.source_idx;
154        let import_op = self.import_op();
155        let task = &self;
156
157        // Deal with change streams.
158        if let Some(change_stream) = import_op.executor.change_stream().await? {
159            let stats = Arc::new(stats::UpdateStats::default());
160            let stats_to_report = stats.clone();
161
162            let status_tx = self.status_tx.clone();
163            let operation_in_process_stats = self.operation_in_process_stats.clone();
164            let process_change_stream = async move {
165                let mut change_stream = change_stream;
166                let retry_options = retryable::RetryOptions {
167                    retry_timeout: None,
168                    initial_backoff: std::time::Duration::from_secs(5),
169                    max_backoff: std::time::Duration::from_secs(60),
170                };
171                loop {
172                    // Workaround as AsyncFnMut isn't mature yet.
173                    // Should be changed to use AsyncFnMut once it is.
174                    let change_stream = tokio::sync::Mutex::new(&mut change_stream);
175                    let change_msg = retryable::run(
176                        || async {
177                            let mut change_stream = change_stream.lock().await;
178                            change_stream
179                                .next()
180                                .await
181                                .transpose()
182                                .map_err(retryable::Error::retryable)
183                        },
184                        &retry_options,
185                    )
186                    .await
187                    .map_err(Error::from)
188                    .with_context(|| {
189                        format!(
190                            "Error in getting change message for flow `{}` source `{}`",
191                            task.flow.flow_instance.name, import_op.name
192                        )
193                    });
194                    let change_msg = match change_msg {
195                        Ok(Some(change_msg)) => change_msg,
196                        Ok(None) => break,
197                        Err(err) => {
198                            error!("{:?}", err);
199                            continue;
200                        }
201                    };
202
203                    let update_stats = Arc::new(stats::UpdateStats::default());
204                    let ack_fn = {
205                        let status_tx = status_tx.clone();
206                        let update_stats = update_stats.clone();
207                        let change_stream_stats = stats.clone();
208                        async move || {
209                            if update_stats.has_any_change() {
210                                status_tx.send_modify(|update| {
211                                    update.source_updates_num[source_idx] += 1;
212                                });
213                                change_stream_stats.merge(&update_stats);
214                            }
215                            if let Some(ack_fn) = change_msg.ack_fn {
216                                ack_fn().await
217                            } else {
218                                Ok(())
219                            }
220                        }
221                    };
222                    let shared_ack_fn = Arc::new(Mutex::new(SharedAckFn::new(
223                        change_msg.changes.iter().len(),
224                        ack_fn,
225                    )));
226                    for change in change_msg.changes {
227                        let shared_ack_fn = shared_ack_fn.clone();
228                        let concur_permit = import_op
229                            .concurrency_controller
230                            .acquire(concur_control::BYTES_UNKNOWN_YET)
231                            .await?;
232                        tokio::spawn(source_indexing_context.clone().process_source_row(
233                            ProcessSourceRowInput {
234                                key: change.key,
235                                key_aux_info: Some(change.key_aux_info),
236                                data: change.data,
237                            },
238                            super::source_indexer::UpdateMode::Normal,
239                            update_stats.clone(),
240                            Some(operation_in_process_stats.clone()),
241                            concur_permit,
242                            Some(move || async move { SharedAckFn::ack(&shared_ack_fn).await }),
243                        ));
244                    }
245                }
246                Ok(())
247            };
248
249            let slf = &self;
250            futs.push(
251                async move {
252                    slf.run_with_progress_report(
253                        process_change_stream,
254                        &stats_to_report,
255                        "change stream",
256                        None,
257                    )
258                    .await
259                }
260                .boxed(),
261            );
262        }
263
264        // The main update loop.
265        futs.push({
266            async move {
267                let refresh_interval = import_op.refresh_options.refresh_interval;
268
269                task.update_one_pass_with_error_logging(
270                    source_indexing_context,
271                    if refresh_interval.is_some() {
272                        "initial interval update"
273                    } else {
274                        "batch update"
275                    },
276                    initial_update_options,
277                )
278                .await;
279
280                let Some(refresh_interval) = refresh_interval else {
281                    return Ok(());
282                };
283
284                let mut interval = tokio::time::interval(refresh_interval);
285                interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
286
287                // tokio::time::interval ticks immediately once; consume it so the first loop waits.
288                interval.tick().await;
289
290                loop {
291                    // Wait for the next scheduled update tick
292                    interval.tick().await;
293
294                    let mut update_fut = Box::pin(task.update_one_pass_with_error_logging(
295                        source_indexing_context,
296                        "interval update",
297                        super::source_indexer::UpdateOptions {
298                            expect_little_diff: true,
299                            mode: super::source_indexer::UpdateMode::Normal,
300                        },
301                    ));
302
303                    tokio::select! {
304                        biased;
305
306                        _ = update_fut.as_mut() => {
307                            // finished within refresh_interval, no warning
308                        }
309
310                        _ = tokio::time::sleep(refresh_interval) => {
311                            // overrun: warn once for this pass, then wait for the pass to finish
312                            warn!(
313                                flow_name = %task.flow.flow_instance.name,
314                                source_name = %task.import_op().name,
315                                update_title = "interval update",
316                                refresh_interval_secs = refresh_interval.as_secs_f64(),
317                                "Live update pass exceeded refresh_interval; interval updates will lag behind"
318                            );
319                            update_fut.as_mut().await;
320                        }
321                    }
322                }
323    }
324    .boxed()
325        });
326
327        try_join_all(futs).await?;
328        Ok(())
329    }
330
331    fn stats_message(
332        &self,
333        stats: &stats::UpdateStats,
334        update_title: &str,
335        start_time: Option<std::time::Instant>,
336    ) -> String {
337        let mut message = format!(
338            "{}.{} ({update_title}):{stats}",
339            self.flow.flow_instance.name,
340            self.import_op().name
341        );
342        if let Some(start_time) = start_time {
343            write!(
344                &mut message,
345                " [elapsed: {:.3}s]",
346                start_time.elapsed().as_secs_f64()
347            )
348            .expect("Failed to write to message");
349        }
350        message
351    }
352
353    fn report_stats(
354        &self,
355        stats: &stats::UpdateStats,
356        update_title: &str,
357        start_time: Option<std::time::Instant>,
358        prefix: &str,
359    ) {
360        if start_time.is_none() && !stats.has_any_change() {
361            return;
362        }
363        if self.options.print_stats {
364            println!(
365                "{prefix}{message}",
366                message = self.stats_message(stats, update_title, start_time)
367            );
368        } else {
369            trace!(
370                "{prefix}{message}",
371                message = self.stats_message(stats, update_title, start_time)
372            );
373        }
374    }
375
376    fn stats_report_enabled(&self) -> bool {
377        self.options.print_stats || tracing::event_enabled!(Level::TRACE)
378    }
379
380    async fn run_with_progress_report(
381        &self,
382        fut: impl Future<Output = Result<()>>,
383        stats: &stats::UpdateStats,
384        update_title: &str,
385        start_time: Option<std::time::Instant>,
386    ) -> Result<()> {
387        let interval = if self.stats_report_enabled() {
388            TRACE_REPORT_INTERVAL
389        } else {
390            return fut.await;
391        };
392        let mut pinned_fut = Box::pin(fut);
393        let mut interval = tokio::time::interval(interval);
394
395        // Use this to skip the first tick if there's no progress bar.
396        let mut report_ready = false;
397        loop {
398            tokio::select! {
399                res = &mut pinned_fut => {
400                    return res;
401                }
402                _ = interval.tick() => {
403                    if report_ready {
404                        self.report_stats(stats, update_title, start_time, "⏳ ");
405                    } else {
406                        report_ready = true;
407                    }
408                }
409            }
410        }
411    }
412
413    async fn update_one_pass(
414        &self,
415        source_indexing_context: &Arc<SourceIndexingContext>,
416        update_title: &str,
417        update_options: super::source_indexer::UpdateOptions,
418    ) -> Result<()> {
419        let start_time = std::time::Instant::now();
420        let update_stats = Arc::new(stats::UpdateStats::default());
421
422        let update_fut = source_indexing_context.update(&update_stats, update_options);
423
424        self.run_with_progress_report(update_fut, &update_stats, update_title, Some(start_time))
425            .await
426            .with_context(|| {
427                format!(
428                    "Error in processing flow `{}` source `{}` ({update_title})",
429                    self.flow.flow_instance.name,
430                    self.import_op().name
431                )
432            })?;
433
434        if update_stats.has_any_change() {
435            self.status_tx.send_modify(|update| {
436                update.source_updates_num[self.source_idx] += 1;
437            });
438        }
439
440        // Report final stats
441        self.report_stats(&update_stats, update_title, Some(start_time), "✅ ");
442        self.source_update_stats.merge(&update_stats);
443        Ok(())
444    }
445
446    async fn update_one_pass_with_error_logging(
447        &self,
448        source_indexing_context: &Arc<SourceIndexingContext>,
449        update_title: &str,
450        update_options: super::source_indexer::UpdateOptions,
451    ) {
452        let result = self
453            .update_one_pass(source_indexing_context, update_title, update_options)
454            .await;
455
456        if let Err(err) = result {
457            error!("{:?}", err);
458        }
459    }
460
461    fn import_op(&self) -> &plan::AnalyzedImportOp {
462        &self.plan.import_ops[self.source_idx]
463    }
464}
465
466impl FlowLiveUpdater {
467    #[instrument(name = "flow_live_updater.start", skip_all, fields(flow_name = %flow_ctx.flow_name()))]
468    pub async fn start(
469        flow_ctx: Arc<FlowContext>,
470        pool: &PgPool,
471        options: FlowLiveUpdaterOptions,
472    ) -> Result<Self> {
473        let plan = flow_ctx.flow.get_execution_plan().await?;
474        let execution_ctx = Arc::new(flow_ctx.use_owned_execution_ctx().await?);
475
476        let (status_tx, status_rx) = watch::channel(FlowLiveUpdaterStatus {
477            active_source_idx: BTreeSet::from_iter(0..plan.import_ops.len()),
478            source_updates_num: vec![0; plan.import_ops.len()],
479        });
480
481        let (num_remaining_tasks_tx, num_remaining_tasks_rx) =
482            watch::channel(plan.import_ops.len());
483
484        let mut join_set = JoinSet::new();
485        let mut stats_per_task = Vec::new();
486        let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default());
487
488        for source_idx in 0..plan.import_ops.len() {
489            let source_update_stats = Arc::new(stats::UpdateStats::default());
490            let source_update_task = SourceUpdateTask {
491                source_idx,
492                flow: flow_ctx.flow.clone(),
493                plan: plan.clone(),
494                execution_ctx: execution_ctx.clone(),
495                source_update_stats: source_update_stats.clone(),
496                operation_in_process_stats: operation_in_process_stats.clone(),
497                pool: pool.clone(),
498                options: options.clone(),
499                status_tx: status_tx.clone(),
500                num_remaining_tasks_tx: num_remaining_tasks_tx.clone(),
501            };
502            join_set.spawn(source_update_task.run());
503            stats_per_task.push(source_update_stats);
504        }
505
506        Ok(Self {
507            flow_ctx,
508            join_set: Mutex::new(Some(join_set)),
509            stats_per_task,
510            operation_in_process_stats,
511            recv_state: tokio::sync::Mutex::new(UpdateReceiveState {
512                status_rx,
513                last_num_source_updates: vec![0; plan.import_ops.len()],
514                is_done: false,
515            }),
516            num_remaining_tasks_rx,
517
518            _status_tx: status_tx,
519            _num_remaining_tasks_tx: num_remaining_tasks_tx,
520        })
521    }
522
523    pub async fn wait(&self) -> Result<()> {
524        {
525            let mut rx = self.num_remaining_tasks_rx.clone();
526            rx.wait_for(|v| *v == 0).await?;
527        }
528
529        let Some(mut join_set) = self.join_set.lock().unwrap().take() else {
530            return Ok(());
531        };
532        while let Some(task_result) = join_set.join_next().await {
533            match task_result {
534                Ok(Ok(_)) => {}
535                Ok(Err(err)) => {
536                    return Err(err);
537                }
538                Err(err) if err.is_cancelled() => {}
539                Err(err) => {
540                    return Err(err.into());
541                }
542            }
543        }
544        Ok(())
545    }
546
547    pub fn abort(&self) {
548        let mut join_set = self.join_set.lock().unwrap();
549        if let Some(join_set) = &mut *join_set {
550            join_set.abort_all();
551        }
552    }
553
554    pub fn index_update_info(&self) -> stats::IndexUpdateInfo {
555        stats::IndexUpdateInfo {
556            sources: std::iter::zip(
557                self.flow_ctx.flow.flow_instance.import_ops.iter(),
558                self.stats_per_task.iter(),
559            )
560            .map(|(import_op, stats)| stats::SourceUpdateInfo {
561                source_name: import_op.name.clone(),
562                stats: stats.as_ref().clone(),
563            })
564            .collect(),
565        }
566    }
567
568    pub async fn next_status_updates(&self) -> Result<FlowLiveUpdaterUpdates> {
569        let mut recv_state = self.recv_state.lock().await;
570        let recv_state = &mut *recv_state;
571
572        if recv_state.is_done {
573            return Ok(FlowLiveUpdaterUpdates {
574                active_sources: vec![],
575                updated_sources: vec![],
576            });
577        }
578
579        recv_state.status_rx.changed().await?;
580        let status = recv_state.status_rx.borrow_and_update();
581        let updates = FlowLiveUpdaterUpdates {
582            active_sources: status
583                .active_source_idx
584                .iter()
585                .map(|idx| {
586                    self.flow_ctx.flow.flow_instance.import_ops[*idx]
587                        .name
588                        .clone()
589                })
590                .collect(),
591            updated_sources: status
592                .source_updates_num
593                .iter()
594                .enumerate()
595                .filter_map(|(idx, num_updates)| {
596                    if num_updates > &recv_state.last_num_source_updates[idx] {
597                        Some(
598                            self.flow_ctx.flow.flow_instance.import_ops[idx]
599                                .name
600                                .clone(),
601                        )
602                    } else {
603                        None
604                    }
605                })
606                .collect(),
607        };
608        recv_state.last_num_source_updates = status.source_updates_num.clone();
609        if status.active_source_idx.is_empty() {
610            recv_state.is_done = true;
611        }
612        Ok(updates)
613    }
614}