cognite/api/data_ingestion/
raw.rs1use std::collections::VecDeque;
2
3use futures::stream::{try_unfold, SelectAll};
4use futures::{FutureExt, StreamExt, TryStream, TryStreamExt};
5
6use crate::api::resource::Resource;
7use crate::dto::items::Items;
8use crate::error::Result;
9use crate::{raw::*, CursorState, CursorStreamState};
10use crate::{Cursor, ItemsVec, LimitCursorQuery};
11
12pub type RawResource = Resource<RawRow>;
16
17impl RawResource {
18 pub async fn list_databases(
25 &self,
26 limit: Option<i32>,
27 cursor: Option<String>,
28 ) -> Result<ItemsVec<Database, Cursor>> {
29 let query = LimitCursorQuery { limit, cursor };
30 self.api_client
31 .get_with_params("raw/dbs", Some(query))
32 .await
33 }
34
35 pub async fn create_databases(&self, dbs: &[Database]) -> Result<Vec<Database>> {
41 let items = Items::new(dbs);
42 let result: ItemsVec<Database, Cursor> = self.api_client.post("raw/dbs", &items).await?;
43 Ok(result.items)
44 }
45
46 pub async fn delete_databases(&self, to_delete: &DeleteDatabasesRequest) -> Result<()> {
52 self.api_client
53 .post::<::serde_json::Value, DeleteDatabasesRequest>("raw/dbs/delete", to_delete)
54 .await?;
55 Ok(())
56 }
57
58 pub async fn list_tables(
66 &self,
67 db_name: &str,
68 limit: Option<i32>,
69 cursor: Option<String>,
70 ) -> Result<ItemsVec<Table, Cursor>> {
71 let query = LimitCursorQuery { limit, cursor };
72 let path = format!("raw/dbs/{db_name}/tables");
73 self.api_client.get_with_params(&path, Some(query)).await
74 }
75
76 pub async fn create_tables(
84 &self,
85 db_name: &str,
86 ensure_parent: bool,
87 tables: &[Table],
88 ) -> Result<Vec<Table>> {
89 let query = EnsureParentQuery {
90 ensure_parent: Some(ensure_parent),
91 };
92 let path = format!("raw/dbs/{db_name}/tables");
93 let items = Items::new(tables);
94 let result: ItemsVec<Table, Cursor> = self
95 .api_client
96 .post_with_query(&path, &items, Some(query))
97 .await?;
98 Ok(result.items)
99 }
100
101 pub async fn delete_tables(&self, db_name: &str, to_delete: &[Table]) -> Result<()> {
108 let path = format!("raw/dbs/{db_name}/tables/delete");
109 let items = Items::new(to_delete);
110 self.api_client
111 .post::<::serde_json::Value, _>(&path, &items)
112 .await?;
113 Ok(())
114 }
115
116 pub async fn retrieve_cursors_for_parallel_reads(
125 &self,
126 db_name: &str,
127 table_name: &str,
128 params: Option<RetrieveCursorsQuery>,
129 ) -> Result<Vec<String>> {
130 let path = format!("raw/dbs/{db_name}/tables/{table_name}/cursors");
131 let result: ItemsVec<String, Cursor> =
132 self.api_client.get_with_params(&path, params).await?;
133 Ok(result.items)
134 }
135
136 pub async fn retrieve_rows(
144 &self,
145 db_name: &str,
146 table_name: &str,
147 params: Option<RetrieveRowsQuery>,
148 ) -> Result<ItemsVec<RawRow, Cursor>> {
149 let path = format!("raw/dbs/{db_name}/tables/{table_name}/rows");
150 self.api_client.get_with_params(&path, params).await
151 }
152
153 pub fn retrieve_all_rows_stream<'a>(
167 &'a self,
168 db_name: &'a str,
169 table_name: &'a str,
170 params: Option<RetrieveRowsQuery>,
171 ) -> impl TryStream<Ok = RawRow, Error = crate::Error, Item = Result<RawRow>> + Send + 'a {
172 let req = params.unwrap_or_default();
173 let initial_state = match &req.cursor {
174 Some(p) => CursorState::Some(p.to_owned()),
175 None => CursorState::Initial,
176 };
177 let state = CursorStreamState {
178 req,
179 responses: VecDeque::new(),
180 next_cursor: initial_state,
181 };
182
183 try_unfold(state, move |mut state| async move {
184 if let Some(next) = state.responses.pop_front() {
185 Ok(Some((next, state)))
186 } else {
187 let cursor = match std::mem::take(&mut state.next_cursor) {
188 CursorState::Initial => None,
189 CursorState::Some(x) => Some(x),
190 CursorState::End => {
191 return Ok(None);
192 }
193 };
194 state.req.cursor = cursor;
195 let response = self
196 .retrieve_rows(db_name, table_name, Some(state.req.clone()))
197 .await?;
198
199 state.responses.extend(response.items);
200 state.next_cursor = match response.extra_fields.next_cursor {
201 Some(x) => CursorState::Some(x),
202 None => CursorState::End,
203 };
204 if let Some(next) = state.responses.pop_front() {
205 Ok(Some((next, state)))
206 } else {
207 Ok(None)
208 }
209 }
210 })
211 }
212
213 pub async fn retrieve_all_rows(
223 &self,
224 db_name: &str,
225 table_name: &str,
226 params: Option<RetrieveRowsQuery>,
227 ) -> Result<Vec<RawRow>> {
228 self.retrieve_all_rows_stream(db_name, table_name, params)
229 .try_collect()
230 .await
231 }
232
233 pub async fn retrieve_all_rows_partitioned(
241 &self,
242 db_name: &str,
243 table_name: &str,
244 params: RetrieveAllPartitionedQuery,
245 ) -> Result<Vec<RawRow>> {
246 self.retrieve_all_rows_partitioned_stream(db_name, table_name, params)
247 .try_collect()
248 .await
249 }
250
251 pub fn retrieve_all_rows_partitioned_stream<'a>(
259 &'a self,
260 db_name: &'a str,
261 table_name: &'a str,
262 params: RetrieveAllPartitionedQuery,
263 ) -> impl TryStream<Ok = RawRow, Error = crate::Error, Item = Result<RawRow>> + Send + 'a {
264 self.retrieve_cursors_for_parallel_reads(
265 db_name,
266 table_name,
267 Some(RetrieveCursorsQuery {
268 min_last_updated_time: params.min_last_updated_time,
269 max_last_updated_time: params.max_last_updated_time,
270 number_of_cursors: params.number_of_cursors,
271 }),
272 )
273 .into_stream()
274 .map_ok(move |cursors| {
275 let mut streams = SelectAll::new();
276 for cursor in cursors {
277 let query = RetrieveRowsQuery {
278 limit: params.limit,
279 columns: params.columns.clone(),
280 cursor: Some(cursor),
281 min_last_updated_time: params.min_last_updated_time,
282 max_last_updated_time: params.max_last_updated_time,
283 };
284 streams.push(
285 self.retrieve_all_rows_stream(db_name, table_name, Some(query))
286 .boxed(),
287 );
288 }
289 streams
290 })
291 .try_flatten()
292 }
293
294 pub async fn insert_rows(
305 &self,
306 db_name: &str,
307 table_name: &str,
308 ensure_parent: bool,
309 rows: &[RawRowCreate],
310 ) -> Result<()> {
311 let path = format!("raw/dbs/{db_name}/tables/{table_name}/rows");
312 let query = EnsureParentQuery {
313 ensure_parent: Some(ensure_parent),
314 };
315 let items = Items::new(rows);
316 self.api_client
317 .post_with_query::<::serde_json::Value, _, EnsureParentQuery>(
318 &path,
319 &items,
320 Some(query),
321 )
322 .await?;
323 Ok(())
324 }
325
326 pub async fn retrieve_row(&self, db_name: &str, table_name: &str, key: &str) -> Result<RawRow> {
334 let path = format!("raw/dbs/{db_name}/tables/{table_name}/rows/{key}");
335 self.api_client.get(&path).await
336 }
337
338 pub async fn delete_rows(
346 &self,
347 db_name: &str,
348 table_name: &str,
349 to_delete: &[DeleteRow],
350 ) -> Result<()> {
351 let path = format!("raw/dbs/{db_name}/tables/{table_name}/rows/delete");
352 let items = Items::new(to_delete);
353 self.api_client
354 .post::<::serde_json::Value, _>(&path, &items)
355 .await?;
356 Ok(())
357 }
358}