cognite/api/data_ingestion/
raw.rs

1use std::collections::VecDeque;
2
3use futures::stream::{try_unfold, SelectAll};
4use futures::{FutureExt, StreamExt, TryStream, TryStreamExt};
5
6use crate::api::resource::Resource;
7use crate::dto::items::Items;
8use crate::error::Result;
9use crate::{raw::*, CursorState, CursorStreamState};
10use crate::{Cursor, ItemsVec, LimitCursorQuery};
11
12/// Raw is a NoSQL JSON store. Each project can have a variable number of databases,
13/// each of which will have a variable number of tables, each of which will have a variable
14/// number of key-value objects. Only queries on key are supported through this API.
15pub type RawResource = Resource<RawRow>;
16
17impl RawResource {
18    /// List Raw databases in the project.
19    ///
20    /// # Arguments
21    ///
22    /// * `limit` - Maximum number of databases to retrieve.
23    /// * `cursor` - Optional cursor for pagination.
24    pub async fn list_databases(
25        &self,
26        limit: Option<i32>,
27        cursor: Option<String>,
28    ) -> Result<ItemsVec<Database, Cursor>> {
29        let query = LimitCursorQuery { limit, cursor };
30        self.api_client
31            .get_with_params("raw/dbs", Some(query))
32            .await
33    }
34
35    /// Create a list of Raw databases.
36    ///
37    /// # Arguments
38    ///
39    /// * `dbs` - Databases to create.
40    pub async fn create_databases(&self, dbs: &[Database]) -> Result<Vec<Database>> {
41        let items = Items::new(dbs);
42        let result: ItemsVec<Database, Cursor> = self.api_client.post("raw/dbs", &items).await?;
43        Ok(result.items)
44    }
45
46    /// Delete a list of raw databases.
47    ///
48    /// # Arguments
49    ///
50    /// * `to_delete` - Request describing which databases to delete and how.
51    pub async fn delete_databases(&self, to_delete: &DeleteDatabasesRequest) -> Result<()> {
52        self.api_client
53            .post::<::serde_json::Value, DeleteDatabasesRequest>("raw/dbs/delete", to_delete)
54            .await?;
55        Ok(())
56    }
57
58    /// List tables in a a raw database.
59    ///
60    /// # Arguments
61    ///
62    /// * `db_name` - Database to list tables in.
63    /// * `limit` - Maximum number of tables to retrieve.
64    /// * `cursor` - Optional cursor for pagination.
65    pub async fn list_tables(
66        &self,
67        db_name: &str,
68        limit: Option<i32>,
69        cursor: Option<String>,
70    ) -> Result<ItemsVec<Table, Cursor>> {
71        let query = LimitCursorQuery { limit, cursor };
72        let path = format!("raw/dbs/{db_name}/tables");
73        self.api_client.get_with_params(&path, Some(query)).await
74    }
75
76    /// Create tables in a raw database.
77    ///
78    /// # Arguments
79    ///
80    /// * `db_name` - Database to create tables in.
81    /// * `ensure_parent` - If this is set to `true`, create database if it doesn't already exist.
82    /// * `tables` - Tables to create.
83    pub async fn create_tables(
84        &self,
85        db_name: &str,
86        ensure_parent: bool,
87        tables: &[Table],
88    ) -> Result<Vec<Table>> {
89        let query = EnsureParentQuery {
90            ensure_parent: Some(ensure_parent),
91        };
92        let path = format!("raw/dbs/{db_name}/tables");
93        let items = Items::new(tables);
94        let result: ItemsVec<Table, Cursor> = self
95            .api_client
96            .post_with_query(&path, &items, Some(query))
97            .await?;
98        Ok(result.items)
99    }
100
101    /// Delete tables in a raw database.
102    ///
103    /// # Arguments
104    ///
105    /// * `db_name` - Database to delete tables from.
106    /// * `to_delete` - Tables to delete.
107    pub async fn delete_tables(&self, db_name: &str, to_delete: &[Table]) -> Result<()> {
108        let path = format!("raw/dbs/{db_name}/tables/delete");
109        let items = Items::new(to_delete);
110        self.api_client
111            .post::<::serde_json::Value, _>(&path, &items)
112            .await?;
113        Ok(())
114    }
115
116    /// Retrieve cursors for parallel reads. This can be used to efficiently download
117    /// large volumes of data from a raw table in parallel.
118    ///
119    /// # Arguments
120    ///
121    /// * `db_name` - Database to retrieve from.
122    /// * `table_name` - Table to retrieve from.
123    /// * `params` - Optional filter parameters.
124    pub async fn retrieve_cursors_for_parallel_reads(
125        &self,
126        db_name: &str,
127        table_name: &str,
128        params: Option<RetrieveCursorsQuery>,
129    ) -> Result<Vec<String>> {
130        let path = format!("raw/dbs/{db_name}/tables/{table_name}/cursors");
131        let result: ItemsVec<String, Cursor> =
132            self.api_client.get_with_params(&path, params).await?;
133        Ok(result.items)
134    }
135
136    /// Retrieve rows from a table, with some basic filtering options.
137    ///
138    /// # Arguments
139    ///
140    /// * `db_name` - Database to retrieve rows from.
141    /// * `table_name` - Table to retrieve rows from.
142    /// * `params` - Optional filter parameters.
143    pub async fn retrieve_rows(
144        &self,
145        db_name: &str,
146        table_name: &str,
147        params: Option<RetrieveRowsQuery>,
148    ) -> Result<ItemsVec<RawRow, Cursor>> {
149        let path = format!("raw/dbs/{db_name}/tables/{table_name}/rows");
150        self.api_client.get_with_params(&path, params).await
151    }
152
153    /// Retrieve all rows from a table, following cursors. This returns a stream, you can abort the stream whenever you
154    /// want and only resources retrieved up to that point will be returned.
155    ///
156    /// Each item in the stream will be a result, after the first error is returned the
157    /// stream will end.
158    ///
159    /// `limit` in the filter only affects how many rows are returned _per request_.
160    ///
161    /// # Arguments
162    ///
163    /// * `db_name` - Database to retrieve rows from.
164    /// * `table_name` - Table to retrieve rows from.
165    /// * `params` - Optional filter parameters. This can set a cursor to start streaming from there.
166    pub fn retrieve_all_rows_stream<'a>(
167        &'a self,
168        db_name: &'a str,
169        table_name: &'a str,
170        params: Option<RetrieveRowsQuery>,
171    ) -> impl TryStream<Ok = RawRow, Error = crate::Error, Item = Result<RawRow>> + Send + 'a {
172        let req = params.unwrap_or_default();
173        let initial_state = match &req.cursor {
174            Some(p) => CursorState::Some(p.to_owned()),
175            None => CursorState::Initial,
176        };
177        let state = CursorStreamState {
178            req,
179            responses: VecDeque::new(),
180            next_cursor: initial_state,
181        };
182
183        try_unfold(state, move |mut state| async move {
184            if let Some(next) = state.responses.pop_front() {
185                Ok(Some((next, state)))
186            } else {
187                let cursor = match std::mem::take(&mut state.next_cursor) {
188                    CursorState::Initial => None,
189                    CursorState::Some(x) => Some(x),
190                    CursorState::End => {
191                        return Ok(None);
192                    }
193                };
194                state.req.cursor = cursor;
195                let response = self
196                    .retrieve_rows(db_name, table_name, Some(state.req.clone()))
197                    .await?;
198
199                state.responses.extend(response.items);
200                state.next_cursor = match response.extra_fields.next_cursor {
201                    Some(x) => CursorState::Some(x),
202                    None => CursorState::End,
203                };
204                if let Some(next) = state.responses.pop_front() {
205                    Ok(Some((next, state)))
206                } else {
207                    Ok(None)
208                }
209            }
210        })
211    }
212
213    /// Retrieve all rows from a table, following cursors.
214    ///
215    /// `limit` in the filter only affects how many rows are returned _per request_.
216    ///
217    /// # Arguments
218    ///
219    /// * `db_name` - Database to retrieve rows from.
220    /// * `table_name` - Table to retrieve rows from.
221    /// * `params` - Optional filter parameters. This can set a cursor to start reading from there.
222    pub async fn retrieve_all_rows(
223        &self,
224        db_name: &str,
225        table_name: &str,
226        params: Option<RetrieveRowsQuery>,
227    ) -> Result<Vec<RawRow>> {
228        self.retrieve_all_rows_stream(db_name, table_name, params)
229            .try_collect()
230            .await
231    }
232
233    /// Retrieve all rows from a table, following cursors and reading from multiple streams in parallel.
234    ///
235    /// The order of the returned values is not guaranteed to be in any way consistent.
236    ///
237    /// * `db_name` - Database to retrieve rows from.
238    /// * `table_name` - Table to retrieve rows from.
239    /// * `params` - Optional filter parameters.
240    pub async fn retrieve_all_rows_partitioned(
241        &self,
242        db_name: &str,
243        table_name: &str,
244        params: RetrieveAllPartitionedQuery,
245    ) -> Result<Vec<RawRow>> {
246        self.retrieve_all_rows_partitioned_stream(db_name, table_name, params)
247            .try_collect()
248            .await
249    }
250
251    /// Retrieve all rows from a table, following cursors and reading from multiple streams in parallel.
252    ///
253    /// The order of the returned values is not guaranteed to be in any way consistent.
254    ///
255    /// * `db_name` - Database to retrieve rows from.
256    /// * `table_name` - Table to retrieve rows from.
257    /// * `params` - Optional filter parameters.
258    pub fn retrieve_all_rows_partitioned_stream<'a>(
259        &'a self,
260        db_name: &'a str,
261        table_name: &'a str,
262        params: RetrieveAllPartitionedQuery,
263    ) -> impl TryStream<Ok = RawRow, Error = crate::Error, Item = Result<RawRow>> + Send + 'a {
264        self.retrieve_cursors_for_parallel_reads(
265            db_name,
266            table_name,
267            Some(RetrieveCursorsQuery {
268                min_last_updated_time: params.min_last_updated_time,
269                max_last_updated_time: params.max_last_updated_time,
270                number_of_cursors: params.number_of_cursors,
271            }),
272        )
273        .into_stream()
274        .map_ok(move |cursors| {
275            let mut streams = SelectAll::new();
276            for cursor in cursors {
277                let query = RetrieveRowsQuery {
278                    limit: params.limit,
279                    columns: params.columns.clone(),
280                    cursor: Some(cursor),
281                    min_last_updated_time: params.min_last_updated_time,
282                    max_last_updated_time: params.max_last_updated_time,
283                };
284                streams.push(
285                    self.retrieve_all_rows_stream(db_name, table_name, Some(query))
286                        .boxed(),
287                );
288            }
289            streams
290        })
291        .try_flatten()
292    }
293
294    /// Insert rows into a table.
295    ///
296    /// If `ensure_parent` is true, create the database and/or table if they do not exist.
297    ///
298    /// # Arguments
299    ///
300    /// * `db_name` - Database to insert rows into.
301    /// * `table_name` - Table to insert rows into.
302    /// * `ensure_parent` - Create database and/or table if they do not exist.
303    /// * `rows` - Raw rows to create.
304    pub async fn insert_rows(
305        &self,
306        db_name: &str,
307        table_name: &str,
308        ensure_parent: bool,
309        rows: &[RawRowCreate],
310    ) -> Result<()> {
311        let path = format!("raw/dbs/{db_name}/tables/{table_name}/rows");
312        let query = EnsureParentQuery {
313            ensure_parent: Some(ensure_parent),
314        };
315        let items = Items::new(rows);
316        self.api_client
317            .post_with_query::<::serde_json::Value, _, EnsureParentQuery>(
318                &path,
319                &items,
320                Some(query),
321            )
322            .await?;
323        Ok(())
324    }
325
326    /// Retrieve a single row from a raw table.
327    ///
328    /// # Arguments
329    ///
330    /// * `db_name` - Database to retrieve from.
331    /// * `table_name` - Table to retrieve from.
332    /// * `key` - Key of row to retrieve.
333    pub async fn retrieve_row(&self, db_name: &str, table_name: &str, key: &str) -> Result<RawRow> {
334        let path = format!("raw/dbs/{db_name}/tables/{table_name}/rows/{key}");
335        self.api_client.get(&path).await
336    }
337
338    /// Delete rows from a raw table.
339    ///
340    /// # Arguments
341    ///
342    /// * `db_name` - Database to delete from.
343    /// * `table_name` - Table to delete from.
344    /// * `to_delete` - Rows to delete.
345    pub async fn delete_rows(
346        &self,
347        db_name: &str,
348        table_name: &str,
349        to_delete: &[DeleteRow],
350    ) -> Result<()> {
351        let path = format!("raw/dbs/{db_name}/tables/{table_name}/rows/delete");
352        let items = Items::new(to_delete);
353        self.api_client
354            .post::<::serde_json::Value, _>(&path, &items)
355            .await?;
356        Ok(())
357    }
358}