cognite/api/data_modeling/
records.rs

1use std::collections::HashMap;
2
3use serde::{de::DeserializeOwned, Serialize};
4
5use crate::{
6    models::{
7        records::{
8            CursorAndHasNext, Record, RecordWrite, RecordsRetrieveRequest, RecordsSyncRequest,
9        },
10        ItemId,
11    },
12    Items, ItemsVec, RawValue, Resource, Result,
13};
14
15pub type RecordsResource = Resource<Record<HashMap<String, RawValue>>>;
16
17impl RecordsResource {
18    /// Ingest records into a stream.
19    ///
20    /// Note: The maximum total request size is 10 MB.
21    ///
22    /// # Arguments
23    ///
24    /// * `stream_id` - ID of the stream to ingest records into.
25    /// * `records` - Records to ingest.
26    pub async fn ingest<T: Serialize>(
27        &self,
28        stream_id: &str,
29        records: &[RecordWrite<T>],
30    ) -> Result<()> {
31        self.api_client
32            .post::<serde_json::Value, _>(
33                &format!("streams/{stream_id}/records"),
34                &Items::new(records),
35            )
36            .await?;
37        Ok(())
38    }
39
40    /// Upsert records into a stream.
41    ///
42    /// Note: The maximum total request size is 10 MB.
43    ///
44    /// # Arguments
45    ///
46    /// * `stream_id` - ID of the stream to ingest records into.
47    /// * `records` - Records to ingest.
48    pub async fn upsert<T: Serialize>(
49        &self,
50        stream_id: &str,
51        records: &[RecordWrite<T>],
52    ) -> Result<()> {
53        self.api_client
54            .post::<serde_json::Value, _>(
55                &format!("streams/{stream_id}/records/upsert"),
56                &Items::new(records),
57            )
58            .await?;
59        Ok(())
60    }
61
62    /// Retrieve records from a stream.
63    ///
64    /// # Arguments
65    ///
66    /// * `stream_id` - ID of the stream to retrieve records from.
67    /// * `request` - Request with optional filter and sort.
68    pub async fn retrieve<T: DeserializeOwned>(
69        &self,
70        stream_id: &str,
71        request: &RecordsRetrieveRequest,
72    ) -> Result<ItemsVec<Record<T>>> {
73        self.api_client
74            .post(&format!("streams/{stream_id}/records/filter"), request)
75            .await
76    }
77
78    /// Subscribe to changes for records from the stream, matching a supplied filter.
79    ///
80    /// # Arguments
81    ///
82    /// * `stream_id` - ID of the stream to subscribe to.
83    /// * `request` - Request with optional filter.
84    pub async fn sync<T: DeserializeOwned>(
85        &self,
86        stream_id: &str,
87        request: &RecordsSyncRequest,
88    ) -> Result<ItemsVec<Record<T>, CursorAndHasNext>> {
89        self.api_client
90            .post(&format!("streams/{stream_id}/records/sync"), request)
91            .await
92    }
93
94    /// Delete records from a stream.
95    ///
96    /// # Arguments
97    ///
98    /// * `stream_id` - ID of the stream to delete records from.
99    /// * `items` - IDs of the records to delete.
100    pub async fn delete(&self, stream_id: &str, items: &[ItemId]) -> Result<()> {
101        self.api_client
102            .post_request(&format!("streams/{stream_id}/records/delete"))
103            .json(&Items::new(items))?
104            .accept_nothing()
105            .send()
106            .await?;
107        Ok(())
108    }
109}