Skip to main content

uni_store/storage/
index_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Index lifecycle management: creation, rebuild, and incremental updates for all index types.
5
6#[cfg(feature = "lance-backend")]
7use crate::storage::inverted_index::InvertedIndex;
8use crate::storage::vertex::VertexDataset;
9use anyhow::{Result, anyhow};
10use arrow_array::UInt64Array;
11use chrono::{DateTime, Utc};
12#[cfg(feature = "lance-backend")]
13use lance::index::vector::VectorIndexParams;
14#[cfg(feature = "lance-backend")]
15use lance_index::progress::IndexBuildProgress;
16#[cfg(feature = "lance-backend")]
17use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
18#[cfg(feature = "lance-backend")]
19use lance_index::vector::hnsw::builder::HnswBuildParams;
20#[cfg(feature = "lance-backend")]
21use lance_index::vector::ivf::IvfBuildParams;
22#[cfg(feature = "lance-backend")]
23use lance_index::vector::pq::PQBuildParams;
24#[cfg(feature = "lance-backend")]
25use lance_index::vector::sq::builder::SQBuildParams;
26#[cfg(feature = "lance-backend")]
27use lance_index::{DatasetIndexExt, IndexType};
28#[cfg(feature = "lance-backend")]
29use lance_linalg::distance::MetricType;
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use std::collections::HashMap;
33#[cfg(feature = "lance-backend")]
34use std::collections::HashSet;
35use std::sync::Arc;
36#[cfg(feature = "lance-backend")]
37use tracing::{debug, info, instrument, warn};
38use uni_common::core::id::Vid;
39#[cfg(feature = "lance-backend")]
40use uni_common::core::schema::IndexDefinition;
41use uni_common::core::schema::SchemaManager;
42#[cfg(feature = "lance-backend")]
43use uni_common::core::schema::{
44    DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
45    ScalarIndexConfig, VectorIndexConfig, VectorIndexType,
46};
47
48/// Validates that a column name contains only safe characters to prevent SQL injection.
49///
50/// Issue #8: Column names must be sanitized before interpolation in SQL queries.
51/// Allows only alphanumeric characters and underscores.
52fn is_valid_column_name(name: &str) -> bool {
53    !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
54}
55
56/// Tracing-based progress reporter for Lance index builds.
57///
58/// Emits structured log events at each stage boundary, enabling
59/// observability into index build duration and progress.
60#[cfg(feature = "lance-backend")]
61#[derive(Debug)]
62pub struct TracingIndexProgress {
63    index_name: String,
64}
65
66#[cfg(feature = "lance-backend")]
67impl TracingIndexProgress {
68    pub fn arc(index_name: &str) -> Arc<dyn IndexBuildProgress> {
69        Arc::new(Self {
70            index_name: index_name.to_string(),
71        })
72    }
73}
74
75#[cfg(feature = "lance-backend")]
76#[async_trait::async_trait]
77impl IndexBuildProgress for TracingIndexProgress {
78    async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
79        info!(
80            index = %self.index_name,
81            stage,
82            ?total,
83            unit,
84            "Index build stage started"
85        );
86        Ok(())
87    }
88
89    async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
90        debug!(
91            index = %self.index_name,
92            stage,
93            completed,
94            "Index build progress"
95        );
96        Ok(())
97    }
98
99    async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
100        info!(
101            index = %self.index_name,
102            stage,
103            "Index build stage complete"
104        );
105        Ok(())
106    }
107}
108
109/// Status of an index rebuild task.
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub enum IndexRebuildStatus {
112    /// Task is waiting to be processed.
113    Pending,
114    /// Task is currently being processed.
115    InProgress,
116    /// Task completed successfully.
117    Completed,
118    /// Task failed with an error.
119    Failed,
120}
121
122/// A task representing an index rebuild operation.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct IndexRebuildTask {
125    /// Unique identifier for this task.
126    pub id: String,
127    /// The label for which indexes are being rebuilt.
128    pub label: String,
129    /// Current status of the task.
130    pub status: IndexRebuildStatus,
131    /// When the task was created.
132    pub created_at: DateTime<Utc>,
133    /// When the task started processing.
134    pub started_at: Option<DateTime<Utc>>,
135    /// When the task completed (successfully or with failure).
136    pub completed_at: Option<DateTime<Utc>>,
137    /// Error message if the task failed.
138    pub error: Option<String>,
139    /// Number of retry attempts.
140    pub retry_count: u32,
141}
142
143/// Manages physical and logical indexes across all vertex datasets.
144pub struct IndexManager {
145    base_uri: String,
146    schema_manager: Arc<SchemaManager>,
147    backend: Arc<dyn crate::backend::StorageBackend>,
148}
149
150impl std::fmt::Debug for IndexManager {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        f.debug_struct("IndexManager")
153            .field("base_uri", &self.base_uri)
154            .finish_non_exhaustive()
155    }
156}
157
158impl IndexManager {
159    /// Create a new `IndexManager` bound to `base_uri` and the given schema and backend.
160    pub fn new(
161        base_uri: &str,
162        schema_manager: Arc<SchemaManager>,
163        backend: Arc<dyn crate::backend::StorageBackend>,
164    ) -> Self {
165        Self {
166            base_uri: base_uri.to_string(),
167            schema_manager,
168            backend,
169        }
170    }
171
172    /// Build and persist an inverted index for set-membership queries.
173    #[cfg(feature = "lance-backend")]
174    #[instrument(skip(self), level = "info")]
175    pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
176        let label = &config.label;
177        let property = &config.property;
178        info!(
179            "Creating Inverted Index '{}' on {}.{}",
180            config.name, label, property
181        );
182
183        let schema = self.schema_manager.schema();
184        let label_meta = schema
185            .labels
186            .get(label)
187            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
188
189        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
190
191        let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
192
193        // Check if dataset exists
194        if ds.open_raw().await.is_ok() {
195            index
196                .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
197                .await?;
198        } else {
199            warn!(
200                "Dataset for label '{}' not found, creating empty inverted index",
201                label
202            );
203        }
204
205        self.schema_manager
206            .add_index(IndexDefinition::Inverted(config))?;
207        self.schema_manager.save().await?;
208
209        Ok(())
210    }
211
212    /// Build and persist a vector (ANN) index on an embedding column.
213    #[cfg(feature = "lance-backend")]
214    #[instrument(skip(self), level = "info")]
215    pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
216        let label = &config.label;
217        let property = &config.property;
218        info!(
219            "Creating vector index '{}' on {}.{}",
220            config.name, label, property
221        );
222
223        let schema = self.schema_manager.schema();
224        let label_meta = schema
225            .labels
226            .get(label)
227            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
228
229        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
230
231        match ds_wrapper.open_raw().await {
232            Ok(mut lance_ds) => {
233                let metric_type = match config.metric {
234                    DistanceMetric::L2 => MetricType::L2,
235                    DistanceMetric::Cosine => MetricType::Cosine,
236                    DistanceMetric::Dot => MetricType::Dot,
237                    _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
238                };
239
240                let params = match config.index_type {
241                    VectorIndexType::IvfPq {
242                        num_partitions,
243                        num_sub_vectors,
244                        bits_per_subvector,
245                    } => {
246                        let ivf = IvfBuildParams::new(num_partitions as usize);
247                        let pq = PQBuildParams::new(
248                            num_sub_vectors as usize,
249                            bits_per_subvector as usize,
250                        );
251                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
252                    }
253                    VectorIndexType::Hnsw {
254                        m,
255                        ef_construction,
256                        ef_search: _,
257                    } => {
258                        let ivf = IvfBuildParams::new(1);
259                        let hnsw = HnswBuildParams::default()
260                            .num_edges(m as usize)
261                            .ef_construction(ef_construction as usize);
262                        let sq = SQBuildParams::default();
263                        VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
264                    }
265                    VectorIndexType::Flat => {
266                        // Fallback to basic IVF-PQ
267                        let ivf = IvfBuildParams::new(1);
268                        let pq = PQBuildParams::default();
269                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
270                    }
271                    _ => {
272                        return Err(anyhow!(
273                            "Unsupported vector index type: {:?}",
274                            config.index_type
275                        ));
276                    }
277                };
278
279                // Ignore errors during creation if dataset is empty or similar, but try
280                let progress = TracingIndexProgress::arc(&config.name);
281                match lance_ds
282                    .create_index_builder(&[property], IndexType::Vector, &params)
283                    .name(config.name.clone())
284                    .replace(true)
285                    .progress(progress)
286                    .await
287                {
288                    Ok(metadata) => {
289                        info!(
290                            index_name = %metadata.name,
291                            index_uuid = %metadata.uuid,
292                            dataset_version = metadata.dataset_version,
293                            "Vector index created"
294                        );
295                    }
296                    Err(e) => {
297                        warn!(
298                            "Failed to create physical vector index (dataset might be empty): {}",
299                            e
300                        );
301                    }
302                }
303            }
304            Err(e) => {
305                warn!(
306                    "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
307                    label, e
308                );
309            }
310        }
311
312        self.schema_manager
313            .add_index(IndexDefinition::Vector(config))?;
314        self.schema_manager.save().await?;
315
316        Ok(())
317    }
318
319    /// Build and persist a scalar (BTree) index for exact-match and range queries.
320    #[cfg(feature = "lance-backend")]
321    #[instrument(skip(self), level = "info")]
322    pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
323        let label = &config.label;
324        let properties = &config.properties;
325        info!(
326            "Creating scalar index '{}' on {}.{:?}",
327            config.name, label, properties
328        );
329
330        let schema = self.schema_manager.schema();
331        let label_meta = schema
332            .labels
333            .get(label)
334            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
335
336        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
337
338        match ds_wrapper.open_raw().await {
339            Ok(mut lance_ds) => {
340                let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
341
342                let progress = TracingIndexProgress::arc(&config.name);
343                match lance_ds
344                    .create_index_builder(
345                        &columns,
346                        IndexType::Scalar,
347                        &ScalarIndexParams::default(),
348                    )
349                    .name(config.name.clone())
350                    .replace(true)
351                    .progress(progress)
352                    .await
353                {
354                    Ok(metadata) => {
355                        info!(
356                            index_name = %metadata.name,
357                            index_uuid = %metadata.uuid,
358                            dataset_version = metadata.dataset_version,
359                            "Scalar index created"
360                        );
361                    }
362                    Err(e) => {
363                        warn!(
364                            "Failed to create physical scalar index (dataset might be empty): {}",
365                            e
366                        );
367                    }
368                }
369            }
370            Err(e) => {
371                warn!(
372                    "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
373                    label, e
374                );
375            }
376        }
377
378        self.schema_manager
379            .add_index(IndexDefinition::Scalar(config))?;
380        self.schema_manager.save().await?;
381
382        Ok(())
383    }
384
385    /// Build and persist a full-text search (Lance inverted) index.
386    #[cfg(feature = "lance-backend")]
387    #[instrument(skip(self), level = "info")]
388    pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
389        let label = &config.label;
390        info!(
391            "Creating FTS index '{}' on {}.{:?}",
392            config.name, label, config.properties
393        );
394
395        let schema = self.schema_manager.schema();
396        let label_meta = schema
397            .labels
398            .get(label)
399            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
400
401        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
402
403        match ds_wrapper.open_raw().await {
404            Ok(mut lance_ds) => {
405                let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
406
407                let fts_params =
408                    InvertedIndexParams::default().with_position(config.with_positions);
409
410                let progress = TracingIndexProgress::arc(&config.name);
411                match lance_ds
412                    .create_index_builder(&columns, IndexType::Inverted, &fts_params)
413                    .name(config.name.clone())
414                    .replace(true)
415                    .progress(progress)
416                    .await
417                {
418                    Ok(metadata) => {
419                        info!(
420                            index_name = %metadata.name,
421                            index_uuid = %metadata.uuid,
422                            dataset_version = metadata.dataset_version,
423                            "FTS index created"
424                        );
425                    }
426                    Err(e) => {
427                        warn!(
428                            "Failed to create physical FTS index (dataset might be empty): {}",
429                            e
430                        );
431                    }
432                }
433            }
434            Err(e) => {
435                warn!(
436                    "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
437                    label, e
438                );
439            }
440        }
441
442        self.schema_manager
443            .add_index(IndexDefinition::FullText(config))?;
444        self.schema_manager.save().await?;
445
446        Ok(())
447    }
448
449    /// Creates a JSON Full-Text Search index on a column.
450    ///
451    /// This creates a Lance inverted index on the specified column,
452    /// enabling BM25-based full-text search with optional phrase matching.
453    #[cfg(feature = "lance-backend")]
454    #[instrument(skip(self), level = "info")]
455    pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
456        let label = &config.label;
457        let column = &config.column;
458        info!(
459            "Creating JSON FTS index '{}' on {}.{}",
460            config.name, label, column
461        );
462
463        let schema = self.schema_manager.schema();
464        let label_meta = schema
465            .labels
466            .get(label)
467            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
468
469        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
470
471        match ds_wrapper.open_raw().await {
472            Ok(mut lance_ds) => {
473                let fts_params =
474                    InvertedIndexParams::default().with_position(config.with_positions);
475
476                let progress = TracingIndexProgress::arc(&config.name);
477                match lance_ds
478                    .create_index_builder(&[column.as_str()], IndexType::Inverted, &fts_params)
479                    .name(config.name.clone())
480                    .replace(true)
481                    .progress(progress)
482                    .await
483                {
484                    Ok(metadata) => {
485                        info!(
486                            index_name = %metadata.name,
487                            index_uuid = %metadata.uuid,
488                            dataset_version = metadata.dataset_version,
489                            "JSON FTS index created"
490                        );
491                    }
492                    Err(e) => {
493                        warn!(
494                            "Failed to create physical JSON FTS index (dataset might be empty): {}",
495                            e
496                        );
497                    }
498                }
499            }
500            Err(e) => {
501                warn!(
502                    "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
503                    label, e
504                );
505            }
506        }
507
508        self.schema_manager
509            .add_index(IndexDefinition::JsonFullText(config))?;
510        self.schema_manager.save().await?;
511
512        Ok(())
513    }
514
515    /// Remove an index both physically from the Lance dataset and from the schema.
516    #[cfg(feature = "lance-backend")]
517    #[instrument(skip(self), level = "info")]
518    pub async fn drop_index(&self, name: &str) -> Result<()> {
519        info!("Dropping index '{}'", name);
520
521        let idx_def = self
522            .schema_manager
523            .get_index(name)
524            .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
525
526        // Attempt physical index drop on the underlying Lance dataset.
527        let label = idx_def.label();
528        let schema = self.schema_manager.schema();
529        if let Some(label_meta) = schema.labels.get(label) {
530            let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
531            match ds_wrapper.open_raw().await {
532                Ok(mut lance_ds) => {
533                    if let Err(e) = lance_ds.drop_index(name).await {
534                        // Log but don't fail — the index may never have been
535                        // physically built (e.g. empty dataset at creation time).
536                        warn!(
537                            "Physical index drop for '{}' returned error (non-fatal): {}",
538                            name, e
539                        );
540                    } else {
541                        info!("Physical index '{}' dropped from Lance dataset", name);
542                    }
543                }
544                Err(e) => {
545                    debug!(
546                        "Could not open dataset for label '{}' to drop physical index: {}",
547                        label, e
548                    );
549                }
550            }
551        }
552
553        self.schema_manager.remove_index(name)?;
554        self.schema_manager.save().await?;
555        Ok(())
556    }
557
558    /// Rebuild all indexes registered for `label` from scratch.
559    #[cfg(feature = "lance-backend")]
560    #[instrument(skip(self), level = "info")]
561    pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
562        info!("Rebuilding all indexes for label '{}'", label);
563        let schema = self.schema_manager.schema();
564
565        // Clone and filter to avoid holding lock while async awaiting
566        let indexes: Vec<_> = schema
567            .indexes
568            .iter()
569            .filter(|idx| idx.label() == label)
570            .cloned()
571            .collect();
572
573        for index in indexes {
574            match index {
575                IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
576                IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
577                IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
578                IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
579                IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
580                _ => warn!("Unknown index type encountered during rebuild, skipping"),
581            }
582        }
583        Ok(())
584    }
585
586    /// Create composite index for unique constraint
587    #[cfg(feature = "lance-backend")]
588    pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
589        let schema = self.schema_manager.schema();
590        let label_meta = schema
591            .labels
592            .get(label)
593            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
594
595        // Lance supports multi-column indexes
596        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
597
598        // We need to verify dataset exists
599        if let Ok(mut ds) = ds_wrapper.open_raw().await {
600            // Create composite BTree index
601            let index_name = format!("{}_{}_composite", label, properties.join("_"));
602
603            // Convert properties to slice of &str
604            let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
605
606            let progress = TracingIndexProgress::arc(&index_name);
607            match ds
608                .create_index_builder(&columns, IndexType::Scalar, &ScalarIndexParams::default())
609                .name(index_name.clone())
610                .replace(true)
611                .progress(progress)
612                .await
613            {
614                Ok(metadata) => {
615                    info!(
616                        index_name = %metadata.name,
617                        index_uuid = %metadata.uuid,
618                        dataset_version = metadata.dataset_version,
619                        "Composite index created"
620                    );
621                }
622                Err(e) => {
623                    warn!("Failed to create physical composite index: {}", e);
624                }
625            }
626
627            let config = ScalarIndexConfig {
628                name: index_name,
629                label: label.to_string(),
630                properties: properties.to_vec(),
631                index_type: uni_common::core::schema::ScalarIndexType::BTree,
632                where_clause: None,
633                metadata: Default::default(),
634            };
635
636            self.schema_manager
637                .add_index(IndexDefinition::Scalar(config))?;
638            self.schema_manager.save().await?;
639        }
640
641        Ok(())
642    }
643
644    /// Lookup by composite key
645    pub async fn composite_lookup(
646        &self,
647        label: &str,
648        key_values: &HashMap<String, Value>,
649    ) -> Result<Option<Vid>> {
650        use crate::backend::types::ScanRequest;
651
652        let schema = self.schema_manager.schema();
653        let label_meta = schema
654            .labels
655            .get(label)
656            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
657
658        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
659        let table_name = ds_wrapper.table_name();
660        let backend = self.backend.as_ref();
661
662        if !backend.table_exists(&table_name).await.unwrap_or(false) {
663            return Ok(None);
664        }
665
666        // Build filter from key values
667        let filter = key_values
668            .iter()
669            .map(|(k, v)| {
670                // Issue #8: Validate column name to prevent SQL injection
671                if !is_valid_column_name(k) {
672                    anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
673                }
674
675                let val_str = match v {
676                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
677                    Value::Number(n) => n.to_string(),
678                    Value::Bool(b) => b.to_string(),
679                    Value::Null => "null".to_string(),
680                    _ => v.to_string(),
681                };
682                // Quote column name for case sensitivity
683                Ok(format!("\"{}\" = {}", k, val_str))
684            })
685            .collect::<Result<Vec<_>>>()?
686            .join(" AND ");
687
688        let request = ScanRequest::all(&table_name)
689            .with_filter(filter)
690            .with_limit(1)
691            .with_columns(vec!["_vid".to_string()]);
692
693        let batches = match backend.scan(request).await {
694            Ok(b) => b,
695            Err(_) => return Ok(None),
696        };
697
698        for batch in batches {
699            if batch.num_rows() > 0 {
700                let vid_col = batch
701                    .column_by_name("_vid")
702                    .ok_or_else(|| anyhow!("Missing _vid column"))?
703                    .as_any()
704                    .downcast_ref::<UInt64Array>()
705                    .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
706
707                let vid = vid_col.value(0);
708                return Ok(Some(Vid::from(vid)));
709            }
710        }
711
712        Ok(None)
713    }
714
715    /// Applies incremental updates to an inverted index.
716    ///
717    /// Instead of rebuilding the entire index, this method updates only the
718    /// changed entries, making it much faster for small mutations.
719    ///
720    /// # Errors
721    ///
722    /// Returns an error if the index doesn't exist or the update fails.
723    #[cfg(feature = "lance-backend")]
724    #[instrument(skip(self, added, removed), level = "info", fields(
725        label = %config.label,
726        property = %config.property
727    ))]
728    pub async fn update_inverted_index_incremental(
729        &self,
730        config: &InvertedIndexConfig,
731        added: &HashMap<Vid, Vec<String>>,
732        removed: &HashSet<Vid>,
733    ) -> Result<()> {
734        info!(
735            added = added.len(),
736            removed = removed.len(),
737            "Incrementally updating inverted index"
738        );
739
740        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
741        index.apply_incremental_updates(added, removed).await
742    }
743}