1use std::collections::BTreeMap;
17
18use futures_core::Stream;
19use reqwest::Method;
20use serde::{Deserialize, Serialize};
21use serde_json::Number;
22use time::OffsetDateTime;
23
24use crate::client::customers::CustomerId;
25use crate::client::Client;
26use crate::config::ListParams;
27use crate::error::Error;
28use crate::util::StrIteratorExt;
29
30const EVENTS_PATH: [&str; 1] = ["events"];
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
34pub struct IngestEventRequest<'a> {
35 pub idempotency_key: &'a str,
40 #[serde(flatten)]
42 pub customer_id: CustomerId<'a>,
43 pub event_name: &'a str,
45 pub properties: &'a BTreeMap<String, EventPropertyValue>,
47 #[serde(with = "time::serde::rfc3339")]
51 pub timestamp: OffsetDateTime,
52}
53
54#[derive(Debug, Clone, Copy)]
56pub enum IngestionMode {
57 Debug,
59 Production,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
65pub struct IngestEventResponse {
66 pub debug: Option<IngestEventDebugResponse>,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
74pub struct IngestEventDebugResponse {
75 pub duplicate: Vec<String>,
77 pub ingested: Vec<String>,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
83pub struct AmendEventRequest<'a> {
84 #[serde(flatten)]
86 pub customer_id: CustomerId<'a>,
87 pub event_name: &'a str,
89 pub properties: &'a BTreeMap<String, EventPropertyValue>,
91 #[serde(with = "time::serde::rfc3339")]
95 pub timestamp: OffsetDateTime,
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
100pub struct Event {
101 pub id: String,
103 pub customer_id: String,
105 pub external_customer_id: Option<String>,
107 pub event_name: String,
109 pub properties: BTreeMap<String, EventPropertyValue>,
111 #[serde(with = "time::serde::rfc3339")]
115 pub timestamp: OffsetDateTime,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)]
120#[serde(untagged)]
121pub enum EventPropertyValue {
122 String(String),
124 Number(Number),
126 Bool(bool),
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
131struct EventFilter<'a> {
132 #[serde(skip_serializing_if = "Option::is_none")]
133 event_ids: Option<&'a [&'a str]>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 invoice_id: Option<&'a str>,
136 #[serde(with = "time::serde::rfc3339::option")]
137 #[serde(skip_serializing_if = "Option::is_none")]
138 timeframe_start: Option<OffsetDateTime>,
139 #[serde(with = "time::serde::rfc3339::option")]
140 #[serde(skip_serializing_if = "Option::is_none")]
141 timeframe_end: Option<OffsetDateTime>,
142}
143
144#[derive(Debug, Clone)]
146pub struct EventSearchParams<'a> {
147 inner: ListParams,
148 filter: EventFilter<'a>,
149}
150
151impl<'a> Default for EventSearchParams<'a> {
152 fn default() -> EventSearchParams<'a> {
153 EventSearchParams::DEFAULT
154 }
155}
156
157impl<'a> EventSearchParams<'a> {
158 pub const DEFAULT: EventSearchParams<'static> = EventSearchParams {
162 inner: ListParams::DEFAULT,
163 filter: EventFilter {
164 event_ids: None,
165 invoice_id: None,
166 timeframe_start: None,
167 timeframe_end: None,
168 },
169 };
170
171 pub const fn page_size(mut self, page_size: u64) -> Self {
175 self.inner = self.inner.page_size(page_size);
176 self
177 }
178
179 pub const fn event_ids(mut self, filter: &'a [&'a str]) -> Self {
181 self.filter.event_ids = Some(filter);
182 self
183 }
184
185 pub const fn invoice_id(mut self, filter: &'a str) -> Self {
187 self.filter.invoice_id = Some(filter);
188 self
189 }
190
191 pub const fn timeframe_start(mut self, start: OffsetDateTime) -> Self {
193 self.filter.timeframe_start = Some(start);
194 self
195 }
196
197 pub const fn timeframe_end(mut self, end: OffsetDateTime) -> Self {
199 self.filter.timeframe_end = Some(end);
200 self
201 }
202}
203
204impl Client {
205 pub fn search_events(
207 &self,
208 params: &EventSearchParams,
209 ) -> impl Stream<Item = Result<Event, Error>> + '_ {
210 let req = self.build_request(Method::POST, EVENTS_PATH.chain_one("search"));
211 let req = req.json(¶ms.filter);
212 self.stream_paginated_request(¶ms.inner, req)
213 }
214
215 pub async fn ingest_events(
217 &self,
218 mode: IngestionMode,
219 events: &[IngestEventRequest<'_>],
220 ) -> Result<IngestEventResponse, Error> {
221 #[derive(Serialize)]
222 struct Envelope<'a> {
223 events: &'a [IngestEventRequest<'a>],
224 }
225
226 let req = self.build_request(Method::POST, ["ingest"]);
227 let req = req.query(&[("debug", matches!(mode, IngestionMode::Debug))]);
228 let req = req.json(&Envelope { events });
229 let res = self.send_request(req).await?;
230 Ok(res)
231 }
232
233 pub async fn amend_event(&self, id: &str, event: &AmendEventRequest<'_>) -> Result<(), Error> {
235 let req = self.build_request(Method::PUT, EVENTS_PATH.chain_one(id));
236 let req = req.json(event);
237 let _: serde_json::Value = self.send_request(req).await?;
238 Ok(())
239 }
240
241 pub async fn deprecate_event(&self, id: &str) -> Result<(), Error> {
243 let req = self.build_request(
244 Method::PUT,
245 EVENTS_PATH.chain_one(id).chain_one("deprecate"),
246 );
247 let _: serde_json::Value = self.send_request(req).await?;
248 Ok(())
249 }
250}