lb_rs/service/
search.rs

1use super::activity::RankingWeights;
2use super::events::Event;
3use crate::model::errors::{LbErr, LbErrKind, LbResult, UnexpectedError};
4use crate::model::filename::DocumentType;
5use crate::Lb;
6use futures::stream::{self, FuturesUnordered, StreamExt, TryStreamExt};
7use serde::Serialize;
8use std::collections::HashMap;
9use std::num::NonZeroUsize;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::thread;
13use std::time::Duration;
14use sublime_fuzzy::{FuzzySearch, Scoring};
15use tokio::sync::RwLock;
16use uuid::Uuid;
17
18const CONTENT_SCORE_THRESHOLD: i64 = 170;
19const PATH_SCORE_THRESHOLD: i64 = 10;
20const CONTENT_MAX_LEN_BYTES: usize = 128 * 1024; // 128kb
21
22const MAX_CONTENT_MATCH_LENGTH: usize = 400;
23const IDEAL_CONTENT_MATCH_LENGTH: usize = 150;
24const CONTENT_MATCH_PADDING: usize = 8;
25
26const FUZZY_WEIGHT: f32 = 0.8;
27
28#[derive(Clone, Default)]
29pub struct SearchIndex {
30    pub building_index: Arc<AtomicBool>,
31    pub index: Arc<RwLock<Vec<SearchIndexEntry>>>,
32}
33
34#[derive(Debug)]
35pub struct SearchIndexEntry {
36    pub id: Uuid,
37    pub path: String,
38    pub content: Option<String>,
39}
40
41#[derive(Copy, Clone, Debug)]
42pub enum SearchConfig {
43    Paths,
44    Documents,
45    PathsAndDocuments,
46}
47
48#[derive(Debug)]
49pub enum SearchResult {
50    DocumentMatch { id: Uuid, path: String, content_matches: Vec<ContentMatch> },
51    PathMatch { id: Uuid, path: String, matched_indices: Vec<usize>, score: i64 },
52}
53
54impl SearchResult {
55    pub fn id(&self) -> Uuid {
56        match self {
57            SearchResult::DocumentMatch { id, .. } | SearchResult::PathMatch { id, .. } => *id,
58        }
59    }
60
61    pub fn path(&self) -> &str {
62        match self {
63            SearchResult::DocumentMatch { path, .. } | SearchResult::PathMatch { path, .. } => path,
64        }
65    }
66
67    pub fn name(&self) -> &str {
68        match self {
69            SearchResult::DocumentMatch { path, .. } | SearchResult::PathMatch { path, .. } => {
70                path.split('/').last().unwrap_or_default()
71            }
72        }
73    }
74
75    pub fn score(&self) -> i64 {
76        match self {
77            SearchResult::DocumentMatch { content_matches, .. } => content_matches
78                .iter()
79                .map(|m| m.score)
80                .max()
81                .unwrap_or_default(),
82            SearchResult::PathMatch { score, .. } => *score,
83        }
84    }
85}
86
87impl Lb {
88    #[instrument(level = "debug", skip(self), err(Debug))]
89    pub async fn search(&self, input: &str, cfg: SearchConfig) -> LbResult<Vec<SearchResult>> {
90        // for cli style invocations nothing will have built the search index yet
91        if !self.config.background_work {
92            self.build_index().await?;
93        }
94
95        // show suggested docs if the input string is empty
96        if input.is_empty() {
97            match cfg {
98                SearchConfig::Paths | SearchConfig::PathsAndDocuments => {
99                    return stream::iter(self.suggested_docs(RankingWeights::default()).await?)
100                        .then(|id| async move {
101                            Ok(SearchResult::PathMatch {
102                                id,
103                                path: self.get_path_by_id(id).await?,
104                                matched_indices: vec![],
105                                score: 0,
106                            })
107                        })
108                        .try_collect()
109                        .await;
110                }
111                SearchConfig::Documents => return Ok(vec![]),
112            }
113        }
114
115        // if the index is empty wait patiently for it become available
116        let mut retries = 0;
117        loop {
118            if self.search.index.read().await.is_empty() {
119                warn!("search index was empty, waiting 50ms");
120                tokio::time::sleep(Duration::from_millis(50)).await;
121                retries += 1;
122
123                if retries == 20 {
124                    error!("could not aquire search index after 20x(50ms) retries.");
125                    return Err(LbErr::from(LbErrKind::Unexpected(
126                        "failed to search, index not available".to_string(),
127                    )));
128                }
129            } else {
130                break;
131            }
132        }
133
134        let mut results = match cfg {
135            SearchConfig::Paths => self.search.search_paths(input).await?,
136            SearchConfig::Documents => self.search.search_content(input).await?,
137            SearchConfig::PathsAndDocuments => {
138                let (paths, docs) = tokio::join!(
139                    self.search.search_paths(input),
140                    self.search.search_content(input)
141                );
142                paths?.into_iter().chain(docs?.into_iter()).collect()
143            }
144        };
145
146        results.sort_unstable_by_key(|r| -r.score());
147        results.truncate(10);
148
149        Ok(results)
150    }
151
152    #[instrument(level = "debug", skip(self), err(Debug))]
153    pub async fn build_index(&self) -> LbResult<()> {
154        // if we haven't signed in yet, we'll leave our index entry and our event subscriber will
155        // handle the state change
156        if self.keychain.get_account().is_err() {
157            return Ok(());
158        }
159
160        // some other caller has already built this index, subscriber will keep it up to date
161        if self.search.building_index.swap(true, Ordering::AcqRel) {
162            return Ok(());
163        }
164
165        let mut tasks = vec![];
166        for file in self.list_metadatas().await? {
167            let id = file.id;
168            let is_doc_searchable =
169                DocumentType::from_file_name_using_extension(&file.name) == DocumentType::Text;
170
171            tasks.push(async move {
172                let (path, content) = if is_doc_searchable {
173                    let (path, doc) =
174                        tokio::join!(self.get_path_by_id(id), self.read_document(id, false));
175
176                    let path = path?;
177
178                    let doc = doc?;
179                    let doc = if doc.len() >= CONTENT_MAX_LEN_BYTES {
180                        None
181                    } else {
182                        Some(String::from_utf8_lossy(&doc).to_string())
183                    };
184
185                    (path, doc)
186                } else {
187                    (self.get_path_by_id(id).await?, None)
188                };
189
190                Ok::<SearchIndexEntry, LbErr>(SearchIndexEntry { id, path, content })
191            });
192        }
193
194        let mut results = stream::iter(tasks).buffer_unordered(
195            thread::available_parallelism()
196                .unwrap_or(NonZeroUsize::new(4).unwrap())
197                .into(),
198        );
199        let mut replacement_index = vec![];
200        while let Some(res) = results.next().await {
201            replacement_index.push(res?);
202        }
203
204        // swap in replacement index (index lock)
205        *self.search.index.write().await = replacement_index;
206
207        Ok(())
208    }
209
210    #[instrument(level = "debug", skip(self))]
211    pub fn setup_search(&self) {
212        if self.config.background_work {
213            let lb = self.clone();
214            let mut rx = self.subscribe();
215            tokio::spawn(async move {
216                lb.build_index().await.unwrap();
217                loop {
218                    let evt = match rx.recv().await {
219                        Ok(evt) => evt,
220                        Err(err) => {
221                            error!("failed to receive from a channel {err}");
222                            return;
223                        }
224                    };
225
226                    match evt {
227                        Event::MetadataChanged(mut id) => {
228                            // if this file is deleted recompute all our metadata
229                            if lb.get_file_by_id(id).await.is_err() {
230                                id = lb.root().await.unwrap().id;
231                            }
232
233                            // compute info for this update up-front
234                            let files = lb.list_metadatas().await.unwrap();
235                            let all_file_ids: Vec<Uuid> = files.into_iter().map(|f| f.id).collect();
236                            let children = lb.get_and_get_children_recursively(&id).await.unwrap();
237                            let mut paths = HashMap::new();
238                            for child in children {
239                                // todo: ideally this would be a single efficient core call
240                                paths.insert(child.id, lb.get_path_by_id(child.id).await.unwrap());
241                            }
242
243                            // aquire the lock
244                            let mut index = lb.search.index.write().await;
245
246                            // handle deletions
247                            index.retain(|entry| all_file_ids.contains(&entry.id));
248
249                            // update any of the paths of this file and the children
250                            for entry in index.iter_mut() {
251                                if paths.contains_key(&entry.id) {
252                                    entry.path = paths.remove(&entry.id).unwrap();
253                                }
254                            }
255
256                            // handle any remaining, new metadata
257                            for (id, path) in paths {
258                                // any content should come in as a result of DocumentWritten
259                                index.push(SearchIndexEntry { id, path, content: None });
260                            }
261                        }
262
263                        Event::DocumentWritten(id) => {
264                            let file = lb.get_file_by_id(id).await.unwrap();
265                            let is_searchable =
266                                DocumentType::from_file_name_using_extension(&file.name)
267                                    == DocumentType::Text;
268
269                            let doc = lb.read_document(id, false).await.unwrap();
270                            let doc = if doc.len() >= CONTENT_MAX_LEN_BYTES || !is_searchable {
271                                None
272                            } else {
273                                Some(String::from_utf8_lossy(&doc).to_string())
274                            };
275
276                            let mut index = lb.search.index.write().await;
277                            let mut found = false;
278                            // todo: consider warn! when doc not found
279                            for entries in index.iter_mut() {
280                                if entries.id == id {
281                                    entries.content = doc;
282                                    found = true;
283                                    break;
284                                }
285                            }
286
287                            if !found {
288                                warn!("could {file:?} not insert doc into index");
289                            }
290                        }
291                    };
292                }
293            });
294        }
295    }
296}
297
298impl SearchIndex {
299    async fn search_paths(&self, input: &str) -> LbResult<Vec<SearchResult>> {
300        let docs_guard = self.index.read().await; // read lock held for the whole fn
301
302        let mut results = Vec::new();
303        for doc in docs_guard.iter() {
304            if let Some(p_match) = FuzzySearch::new(input, &doc.path)
305                .case_insensitive()
306                .score_with(&Scoring::emphasize_distance())
307                .best_match()
308            {
309                let score = (p_match.score().min(600) as f32 * FUZZY_WEIGHT) as i64;
310
311                if score > PATH_SCORE_THRESHOLD {
312                    results.push(SearchResult::PathMatch {
313                        id: doc.id,
314                        path: doc.path.clone(),
315                        matched_indices: p_match.matched_indices().cloned().collect(),
316                        score,
317                    });
318                }
319            }
320        }
321        Ok(results)
322    }
323
324    async fn search_content(&self, input: &str) -> LbResult<Vec<SearchResult>> {
325        let search_futures = FuturesUnordered::new();
326        let docs = self.index.read().await;
327
328        for (idx, _) in docs.iter().enumerate() {
329            search_futures.push(async move {
330                let doc = &self.index.read().await[idx];
331                let id = doc.id;
332                let path = &doc.path;
333                let content = &doc.content;
334                if let Some(content) = content {
335                    let mut content_matches = Vec::new();
336
337                    for paragraph in content.split("\n\n") {
338                        if let Some(c_match) = FuzzySearch::new(input, paragraph)
339                            .case_insensitive()
340                            .score_with(&Scoring::emphasize_distance())
341                            .best_match()
342                        {
343                            let score = (c_match.score().min(600) as f32 * FUZZY_WEIGHT) as i64;
344                            let (paragraph, matched_indices) = match Self::optimize_searched_text(
345                                paragraph,
346                                c_match.matched_indices().cloned().collect(),
347                            ) {
348                                Ok((paragraph, matched_indices)) => (paragraph, matched_indices),
349                                Err(_) => continue,
350                            };
351
352                            if score > CONTENT_SCORE_THRESHOLD {
353                                content_matches.push(ContentMatch {
354                                    paragraph,
355                                    matched_indices,
356                                    score,
357                                });
358                            }
359                        }
360                    }
361
362                    if !content_matches.is_empty() {
363                        return Some(SearchResult::DocumentMatch {
364                            id,
365                            path: path.clone(),
366                            content_matches,
367                        });
368                    }
369                }
370                None
371            });
372        }
373
374        Ok(search_futures
375            .collect::<Vec<Option<SearchResult>>>()
376            .await
377            .into_iter()
378            .flatten()
379            .collect::<Vec<SearchResult>>())
380    }
381
382    fn optimize_searched_text(
383        paragraph: &str, matched_indices: Vec<usize>,
384    ) -> Result<(String, Vec<usize>), UnexpectedError> {
385        if paragraph.len() <= IDEAL_CONTENT_MATCH_LENGTH {
386            return Ok((paragraph.to_string(), matched_indices));
387        }
388
389        let mut index_offset: usize = 0;
390        let mut new_paragraph = paragraph.to_string();
391        let mut new_indices = matched_indices;
392
393        let first_match = new_indices.first().ok_or_else(|| {
394            warn!("A fuzzy match happened but there are no matched indices.");
395            UnexpectedError::new("No matched indices.".to_string())
396        })?;
397
398        let last_match = new_indices.last().ok_or_else(|| {
399            warn!("A fuzzy match happened but there are no matched indices.");
400            UnexpectedError::new("No matched indices.".to_string())
401        })?;
402
403        if *last_match < IDEAL_CONTENT_MATCH_LENGTH {
404            new_paragraph = new_paragraph
405                .chars()
406                .take(IDEAL_CONTENT_MATCH_LENGTH + CONTENT_MATCH_PADDING)
407                .chain("...".chars())
408                .collect();
409        } else {
410            if *first_match > CONTENT_MATCH_PADDING {
411                let at_least_take = new_paragraph.len() - first_match + CONTENT_MATCH_PADDING;
412
413                let deleted_chars_len = if at_least_take > IDEAL_CONTENT_MATCH_LENGTH {
414                    first_match - CONTENT_MATCH_PADDING
415                } else {
416                    new_paragraph.len() - IDEAL_CONTENT_MATCH_LENGTH
417                };
418
419                index_offset = deleted_chars_len - 3;
420
421                new_paragraph = "..."
422                    .chars()
423                    .chain(new_paragraph.chars().skip(deleted_chars_len))
424                    .collect();
425            }
426
427            if new_paragraph.len() > IDEAL_CONTENT_MATCH_LENGTH + CONTENT_MATCH_PADDING + 3 {
428                let at_least_take = *last_match - index_offset + CONTENT_MATCH_PADDING;
429
430                let take_chars_len = if at_least_take > IDEAL_CONTENT_MATCH_LENGTH {
431                    at_least_take
432                } else {
433                    IDEAL_CONTENT_MATCH_LENGTH
434                };
435
436                new_paragraph = new_paragraph
437                    .chars()
438                    .take(take_chars_len)
439                    .chain("...".chars())
440                    .collect();
441            }
442
443            if new_paragraph.len() > MAX_CONTENT_MATCH_LENGTH {
444                new_paragraph = new_paragraph
445                    .chars()
446                    .take(MAX_CONTENT_MATCH_LENGTH)
447                    .chain("...".chars())
448                    .collect();
449
450                new_indices.retain(|index| (*index - index_offset) < MAX_CONTENT_MATCH_LENGTH)
451            }
452        }
453
454        Ok((
455            new_paragraph,
456            new_indices
457                .iter()
458                .map(|index| *index - index_offset)
459                .collect(),
460        ))
461    }
462}
463
464#[derive(Debug, Serialize)]
465pub struct ContentMatch {
466    pub paragraph: String,
467    pub matched_indices: Vec<usize>,
468    pub score: i64,
469}