1use crate::client::AlgoliaSearchApi;
2use crate::conversions::{
3 algolia_object_to_doc, algolia_response_to_search_results, algolia_settings_to_schema,
4 create_retry_query, doc_to_algolia_object, schema_to_algolia_settings,
5 search_query_to_algolia_query,
6};
7use golem_ai_search::durability::{DurableSearch, ExtendedSearchProvider};
8use golem_ai_search::model::{CreateIndexOptions, SearchStream};
9use golem_ai_search::model::{
10 Doc, DocumentId, IndexName, Schema, SearchError, SearchHit, SearchQuery, SearchResults,
11};
12use golem_ai_search::wasi_compat::{subscribe_zero, Pollable};
13use golem_ai_search::{SearchProvider, SearchStreamInterface};
14use std::cell::{Cell, RefCell};
15
16mod client;
17pub mod config;
18mod conversions;
19
20pub use crate::config::AlgoliaConfig;
21#[cfg(feature = "golem")]
22pub use crate::config::AlgoliaHostConfig;
23
24pub struct AlgoliaSearchStream {
25 client: AlgoliaSearchApi,
26 index_name: String,
27 query: SearchQuery,
28 current_page: Cell<u32>,
29 finished: Cell<bool>,
30 last_response: RefCell<Option<SearchResults>>,
31}
32
33impl AlgoliaSearchStream {
34 pub fn new(client: AlgoliaSearchApi, index_name: String, query: SearchQuery) -> Self {
35 Self {
36 client,
37 index_name,
38 query: query.clone(),
39 current_page: Cell::new(query.page.unwrap_or(0)),
40 finished: Cell::new(false),
41 last_response: RefCell::new(None),
42 }
43 }
44
45 pub fn subscribe(&self) -> Pollable {
46 subscribe_zero()
47 }
48}
49
50impl SearchStreamInterface for AlgoliaSearchStream {
51 fn as_any(&self) -> &dyn std::any::Any {
52 self
53 }
54
55 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
56 self
57 }
58
59 fn get_next(&self) -> Option<Vec<SearchHit>> {
60 if self.finished.get() {
61 return Some(vec![]);
62 }
63
64 let mut search_query = self.query.clone();
65 search_query.page = Some(self.current_page.get());
66
67 let algolia_query = search_query_to_algolia_query(search_query);
68
69 match self.client.search(&self.index_name, &algolia_query) {
70 Ok(response) => {
71 let search_results = algolia_response_to_search_results(response);
72
73 let current_page = self.current_page.get();
74 let total_pages = if let (Some(total), Some(per_page)) =
75 (search_results.total, search_results.per_page)
76 {
77 total.div_ceil(per_page)
78 } else {
79 current_page + 1
80 };
81
82 if current_page >= total_pages || search_results.hits.is_empty() {
83 self.finished.set(true);
84 }
85
86 self.current_page.set(current_page + 1);
87
88 let hits = search_results.hits.clone();
89 *self.last_response.borrow_mut() = Some(search_results);
90
91 Some(hits)
92 }
93 Err(_) => {
94 self.finished.set(true);
95 Some(vec![])
96 }
97 }
98 }
99
100 fn blocking_get_next(&self) -> Vec<SearchHit> {
101 self.get_next().unwrap_or_default()
102 }
103}
104
105pub struct Algolia;
106
107impl SearchProvider for Algolia {
108 type SearchStream = AlgoliaSearchStream;
109 type ProviderConfig = AlgoliaConfig;
110
111 fn create_index(
112 _provider_config: Self::ProviderConfig,
113 _options: CreateIndexOptions,
114 ) -> Result<(), SearchError> {
115 Err(SearchError::Unsupported)
119 }
120
121 fn delete_index(
122 provider_config: Self::ProviderConfig,
123 name: IndexName,
124 ) -> Result<(), SearchError> {
125 let client = AlgoliaSearchApi::new(&provider_config);
126
127 match client.delete_index(&name) {
128 Ok(response) => {
129 let _ = response;
130 Ok(())
131 }
132 Err(e) => Err(e),
133 }
134 }
135
136 fn list_indexes(provider_config: Self::ProviderConfig) -> Result<Vec<IndexName>, SearchError> {
137 let client = AlgoliaSearchApi::new(&provider_config);
138
139 match client.list_indexes() {
140 Ok(response) => Ok(response.items.into_iter().map(|item| item.name).collect()),
141 Err(e) => Err(e),
142 }
143 }
144
145 fn upsert(
146 provider_config: Self::ProviderConfig,
147 index: IndexName,
148 doc: Doc,
149 ) -> Result<(), SearchError> {
150 let client = AlgoliaSearchApi::new(&provider_config);
151 let algolia_object = doc_to_algolia_object(doc).map_err(SearchError::InvalidQuery)?;
152
153 match client.save_object(&index, &algolia_object) {
154 Ok(response) => {
155 let _ = response;
156 Ok(())
157 }
158 Err(e) => Err(e),
159 }
160 }
161
162 fn upsert_many(
163 provider_config: Self::ProviderConfig,
164 index: IndexName,
165 docs: Vec<Doc>,
166 ) -> Result<(), SearchError> {
167 let client = AlgoliaSearchApi::new(&provider_config);
168 let mut algolia_objects = Vec::new();
169
170 for doc in docs {
171 let algolia_object = doc_to_algolia_object(doc).map_err(SearchError::InvalidQuery)?;
172 algolia_objects.push(algolia_object);
173 }
174
175 match client.save_objects(&index, &algolia_objects) {
176 Ok(response) => {
177 let _ = response;
178 Ok(())
179 }
180 Err(e) => Err(e),
181 }
182 }
183
184 fn delete(
185 provider_config: Self::ProviderConfig,
186 index: IndexName,
187 id: DocumentId,
188 ) -> Result<(), SearchError> {
189 let client = AlgoliaSearchApi::new(&provider_config);
190
191 match client.delete_object(&index, &id) {
192 Ok(response) => {
193 let _ = response;
194 Ok(())
195 }
196 Err(e) => Err(e),
197 }
198 }
199
200 fn delete_many(
201 provider_config: Self::ProviderConfig,
202 index: IndexName,
203 ids: Vec<DocumentId>,
204 ) -> Result<(), SearchError> {
205 let client = AlgoliaSearchApi::new(&provider_config);
206
207 match client.delete_objects(&index, &ids) {
208 Ok(response) => {
209 let _ = response;
210 Ok(())
211 }
212 Err(e) => Err(e),
213 }
214 }
215
216 fn get(
217 provider_config: Self::ProviderConfig,
218 index: IndexName,
219 id: DocumentId,
220 ) -> Result<Option<Doc>, SearchError> {
221 let client = AlgoliaSearchApi::new(&provider_config);
222
223 match client.get_object(&index, &id) {
224 Ok(Some(algolia_object)) => Ok(Some(algolia_object_to_doc(algolia_object))),
225 Ok(None) => Ok(None),
226 Err(e) => Err(e),
227 }
228 }
229
230 fn search(
231 provider_config: Self::ProviderConfig,
232 index: IndexName,
233 query: SearchQuery,
234 ) -> Result<SearchResults, SearchError> {
235 let client = AlgoliaSearchApi::new(&provider_config);
236 let algolia_query = search_query_to_algolia_query(query);
237
238 match client.search(&index, &algolia_query) {
239 Ok(response) => Ok(algolia_response_to_search_results(response)),
240 Err(e) => Err(e),
241 }
242 }
243
244 fn stream_search(
245 provider_config: Self::ProviderConfig,
246 index: IndexName,
247 query: SearchQuery,
248 ) -> Result<SearchStream, SearchError> {
249 let client = AlgoliaSearchApi::new(&provider_config);
250 let stream = AlgoliaSearchStream::new(client, index, query);
251 Ok(SearchStream::new(stream))
252 }
253
254 fn get_schema(
255 provider_config: Self::ProviderConfig,
256 index: IndexName,
257 ) -> Result<Schema, SearchError> {
258 let client = AlgoliaSearchApi::new(&provider_config);
259
260 match client.get_settings(&index) {
261 Ok(settings) => Ok(algolia_settings_to_schema(settings)),
262 Err(e) => Err(e),
263 }
264 }
265
266 fn update_schema(
267 provider_config: Self::ProviderConfig,
268 index: IndexName,
269 schema: Schema,
270 ) -> Result<(), SearchError> {
271 let client = AlgoliaSearchApi::new(&provider_config);
272 let settings = schema_to_algolia_settings(schema);
273
274 client.set_settings(&index, &settings)?;
275
276 Ok(())
277 }
278}
279
280impl ExtendedSearchProvider for Algolia {
281 fn unwrapped_stream(
282 provider_config: Self::ProviderConfig,
283 index: IndexName,
284 query: SearchQuery,
285 ) -> Self::SearchStream {
286 let client = AlgoliaSearchApi::new(&provider_config);
287 AlgoliaSearchStream::new(client, index, query)
288 }
289
290 fn retry_query(original_query: &SearchQuery, partial_hits: &[SearchHit]) -> SearchQuery {
291 create_retry_query(original_query, partial_hits)
292 }
293
294 fn subscribe(stream: &Self::SearchStream) -> Pollable {
295 stream.subscribe()
296 }
297}
298
299pub type DurableAlgolia = DurableSearch<Algolia>;