Skip to main content

golem_ai_search_algolia/
lib.rs

1use crate::client::AlgoliaSearchApi;
2use crate::conversions::{
3    algolia_object_to_doc, algolia_response_to_search_results, algolia_settings_to_schema,
4    create_retry_query, doc_to_algolia_object, schema_to_algolia_settings,
5    search_query_to_algolia_query,
6};
7use golem_ai_search::durability::{DurableSearch, ExtendedSearchProvider};
8use golem_ai_search::model::{CreateIndexOptions, SearchStream};
9use golem_ai_search::model::{
10    Doc, DocumentId, IndexName, Schema, SearchError, SearchHit, SearchQuery, SearchResults,
11};
12use golem_ai_search::wasi_compat::{subscribe_zero, Pollable};
13use golem_ai_search::{SearchProvider, SearchStreamInterface};
14use std::cell::{Cell, RefCell};
15
16mod client;
17pub mod config;
18mod conversions;
19
20pub use crate::config::AlgoliaConfig;
21#[cfg(feature = "golem")]
22pub use crate::config::AlgoliaHostConfig;
23
24pub struct AlgoliaSearchStream {
25    client: AlgoliaSearchApi,
26    index_name: String,
27    query: SearchQuery,
28    current_page: Cell<u32>,
29    finished: Cell<bool>,
30    last_response: RefCell<Option<SearchResults>>,
31}
32
33impl AlgoliaSearchStream {
34    pub fn new(client: AlgoliaSearchApi, index_name: String, query: SearchQuery) -> Self {
35        Self {
36            client,
37            index_name,
38            query: query.clone(),
39            current_page: Cell::new(query.page.unwrap_or(0)),
40            finished: Cell::new(false),
41            last_response: RefCell::new(None),
42        }
43    }
44
45    pub fn subscribe(&self) -> Pollable {
46        subscribe_zero()
47    }
48}
49
50impl SearchStreamInterface for AlgoliaSearchStream {
51    fn as_any(&self) -> &dyn std::any::Any {
52        self
53    }
54
55    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
56        self
57    }
58
59    fn get_next(&self) -> Option<Vec<SearchHit>> {
60        if self.finished.get() {
61            return Some(vec![]);
62        }
63
64        let mut search_query = self.query.clone();
65        search_query.page = Some(self.current_page.get());
66
67        let algolia_query = search_query_to_algolia_query(search_query);
68
69        match self.client.search(&self.index_name, &algolia_query) {
70            Ok(response) => {
71                let search_results = algolia_response_to_search_results(response);
72
73                let current_page = self.current_page.get();
74                let total_pages = if let (Some(total), Some(per_page)) =
75                    (search_results.total, search_results.per_page)
76                {
77                    total.div_ceil(per_page)
78                } else {
79                    current_page + 1
80                };
81
82                if current_page >= total_pages || search_results.hits.is_empty() {
83                    self.finished.set(true);
84                }
85
86                self.current_page.set(current_page + 1);
87
88                let hits = search_results.hits.clone();
89                *self.last_response.borrow_mut() = Some(search_results);
90
91                Some(hits)
92            }
93            Err(_) => {
94                self.finished.set(true);
95                Some(vec![])
96            }
97        }
98    }
99
100    fn blocking_get_next(&self) -> Vec<SearchHit> {
101        self.get_next().unwrap_or_default()
102    }
103}
104
105pub struct Algolia;
106
107impl SearchProvider for Algolia {
108    type SearchStream = AlgoliaSearchStream;
109    type ProviderConfig = AlgoliaConfig;
110
111    fn create_index(
112        _provider_config: Self::ProviderConfig,
113        _options: CreateIndexOptions,
114    ) -> Result<(), SearchError> {
115        // Algolia doesn't require explicit index creation - indices are created automatically
116        // when you first add documents.
117        // providers that don't support index creation should return unsupported.
118        Err(SearchError::Unsupported)
119    }
120
121    fn delete_index(
122        provider_config: Self::ProviderConfig,
123        name: IndexName,
124    ) -> Result<(), SearchError> {
125        let client = AlgoliaSearchApi::new(&provider_config);
126
127        match client.delete_index(&name) {
128            Ok(response) => {
129                let _ = response;
130                Ok(())
131            }
132            Err(e) => Err(e),
133        }
134    }
135
136    fn list_indexes(provider_config: Self::ProviderConfig) -> Result<Vec<IndexName>, SearchError> {
137        let client = AlgoliaSearchApi::new(&provider_config);
138
139        match client.list_indexes() {
140            Ok(response) => Ok(response.items.into_iter().map(|item| item.name).collect()),
141            Err(e) => Err(e),
142        }
143    }
144
145    fn upsert(
146        provider_config: Self::ProviderConfig,
147        index: IndexName,
148        doc: Doc,
149    ) -> Result<(), SearchError> {
150        let client = AlgoliaSearchApi::new(&provider_config);
151        let algolia_object = doc_to_algolia_object(doc).map_err(SearchError::InvalidQuery)?;
152
153        match client.save_object(&index, &algolia_object) {
154            Ok(response) => {
155                let _ = response;
156                Ok(())
157            }
158            Err(e) => Err(e),
159        }
160    }
161
162    fn upsert_many(
163        provider_config: Self::ProviderConfig,
164        index: IndexName,
165        docs: Vec<Doc>,
166    ) -> Result<(), SearchError> {
167        let client = AlgoliaSearchApi::new(&provider_config);
168        let mut algolia_objects = Vec::new();
169
170        for doc in docs {
171            let algolia_object = doc_to_algolia_object(doc).map_err(SearchError::InvalidQuery)?;
172            algolia_objects.push(algolia_object);
173        }
174
175        match client.save_objects(&index, &algolia_objects) {
176            Ok(response) => {
177                let _ = response;
178                Ok(())
179            }
180            Err(e) => Err(e),
181        }
182    }
183
184    fn delete(
185        provider_config: Self::ProviderConfig,
186        index: IndexName,
187        id: DocumentId,
188    ) -> Result<(), SearchError> {
189        let client = AlgoliaSearchApi::new(&provider_config);
190
191        match client.delete_object(&index, &id) {
192            Ok(response) => {
193                let _ = response;
194                Ok(())
195            }
196            Err(e) => Err(e),
197        }
198    }
199
200    fn delete_many(
201        provider_config: Self::ProviderConfig,
202        index: IndexName,
203        ids: Vec<DocumentId>,
204    ) -> Result<(), SearchError> {
205        let client = AlgoliaSearchApi::new(&provider_config);
206
207        match client.delete_objects(&index, &ids) {
208            Ok(response) => {
209                let _ = response;
210                Ok(())
211            }
212            Err(e) => Err(e),
213        }
214    }
215
216    fn get(
217        provider_config: Self::ProviderConfig,
218        index: IndexName,
219        id: DocumentId,
220    ) -> Result<Option<Doc>, SearchError> {
221        let client = AlgoliaSearchApi::new(&provider_config);
222
223        match client.get_object(&index, &id) {
224            Ok(Some(algolia_object)) => Ok(Some(algolia_object_to_doc(algolia_object))),
225            Ok(None) => Ok(None),
226            Err(e) => Err(e),
227        }
228    }
229
230    fn search(
231        provider_config: Self::ProviderConfig,
232        index: IndexName,
233        query: SearchQuery,
234    ) -> Result<SearchResults, SearchError> {
235        let client = AlgoliaSearchApi::new(&provider_config);
236        let algolia_query = search_query_to_algolia_query(query);
237
238        match client.search(&index, &algolia_query) {
239            Ok(response) => Ok(algolia_response_to_search_results(response)),
240            Err(e) => Err(e),
241        }
242    }
243
244    fn stream_search(
245        provider_config: Self::ProviderConfig,
246        index: IndexName,
247        query: SearchQuery,
248    ) -> Result<SearchStream, SearchError> {
249        let client = AlgoliaSearchApi::new(&provider_config);
250        let stream = AlgoliaSearchStream::new(client, index, query);
251        Ok(SearchStream::new(stream))
252    }
253
254    fn get_schema(
255        provider_config: Self::ProviderConfig,
256        index: IndexName,
257    ) -> Result<Schema, SearchError> {
258        let client = AlgoliaSearchApi::new(&provider_config);
259
260        match client.get_settings(&index) {
261            Ok(settings) => Ok(algolia_settings_to_schema(settings)),
262            Err(e) => Err(e),
263        }
264    }
265
266    fn update_schema(
267        provider_config: Self::ProviderConfig,
268        index: IndexName,
269        schema: Schema,
270    ) -> Result<(), SearchError> {
271        let client = AlgoliaSearchApi::new(&provider_config);
272        let settings = schema_to_algolia_settings(schema);
273
274        client.set_settings(&index, &settings)?;
275
276        Ok(())
277    }
278}
279
280impl ExtendedSearchProvider for Algolia {
281    fn unwrapped_stream(
282        provider_config: Self::ProviderConfig,
283        index: IndexName,
284        query: SearchQuery,
285    ) -> Self::SearchStream {
286        let client = AlgoliaSearchApi::new(&provider_config);
287        AlgoliaSearchStream::new(client, index, query)
288    }
289
290    fn retry_query(original_query: &SearchQuery, partial_hits: &[SearchHit]) -> SearchQuery {
291        create_retry_query(original_query, partial_hits)
292    }
293
294    fn subscribe(stream: &Self::SearchStream) -> Pollable {
295        stream.subscribe()
296    }
297}
298
299pub type DurableAlgolia = DurableSearch<Algolia>;