orb_billing/client/
events.rs

1// Copyright Materialize, Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use std::collections::BTreeMap;
17
18use futures_core::Stream;
19use reqwest::Method;
20use serde::{Deserialize, Serialize};
21use serde_json::Number;
22use time::OffsetDateTime;
23
24use crate::client::customers::CustomerId;
25use crate::client::Client;
26use crate::config::ListParams;
27use crate::error::Error;
28use crate::util::StrIteratorExt;
29
30const EVENTS_PATH: [&str; 1] = ["events"];
31
32/// The subset of [`Event`] used in event ingestion requests.
33#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
34pub struct IngestEventRequest<'a> {
35    /// A unique, client-generated identifier for the event.
36    ///
37    /// Exactly one event with a given idempotency key will be ingested, which
38    /// allows for safe request retries.
39    pub idempotency_key: &'a str,
40    /// The Orb ID for the customer with which the event is associated.
41    #[serde(flatten)]
42    pub customer_id: CustomerId<'a>,
43    /// A name that meaningfully identifies the action or event.
44    pub event_name: &'a str,
45    /// Arbitrary properties associated with the event.
46    pub properties: &'a BTreeMap<String, EventPropertyValue>,
47    /// The time at which the event occurred.
48    ///
49    /// Important for attributing usage to a given billing period.
50    #[serde(with = "time::serde::rfc3339")]
51    pub timestamp: OffsetDateTime,
52}
53
54/// The ingestion mode.
55#[derive(Debug, Clone, Copy)]
56pub enum IngestionMode {
57    /// Enable debugging information.
58    Debug,
59    /// Disable debugging information for improved performance.
60    Production,
61}
62
63/// The response to an event ingestion request.
64#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
65pub struct IngestEventResponse {
66    /// Debug information.
67    ///
68    /// Only present when the request is made with [`IngestionMode::Debug`].
69    pub debug: Option<IngestEventDebugResponse>,
70}
71
72/// The type of [`IngestEventResponse::debug`].
73#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
74pub struct IngestEventDebugResponse {
75    /// Contains the IDs of events that were already known to Orb.
76    pub duplicate: Vec<String>,
77    /// Contains the IDs of events that were new to Orb.
78    pub ingested: Vec<String>,
79}
80
81/// The subset of [`Event`] used in amendment requests.
82#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
83pub struct AmendEventRequest<'a> {
84    /// The Orb ID for the customer with which the event is associated.
85    #[serde(flatten)]
86    pub customer_id: CustomerId<'a>,
87    /// A name that meaningfully identifies the action or event.
88    pub event_name: &'a str,
89    /// Arbitrary properties associated with the event.
90    pub properties: &'a BTreeMap<String, EventPropertyValue>,
91    /// The time at which the event occurred.
92    ///
93    /// Important for attributing usage to a given billing period.
94    #[serde(with = "time::serde::rfc3339")]
95    pub timestamp: OffsetDateTime,
96}
97
98/// An Orb event.
99#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
100pub struct Event {
101    /// A unique, client-generated identifier for the event.
102    pub id: String,
103    /// The Orb ID for the customer with which the event is associated.
104    pub customer_id: String,
105    /// The external ID for the customer with which the event is associated.
106    pub external_customer_id: Option<String>,
107    /// A name that meaningfully identifies the action or event.
108    pub event_name: String,
109    /// Arbitrary properties associated with the event.
110    pub properties: BTreeMap<String, EventPropertyValue>,
111    /// The time at which the event occurred.
112    ///
113    /// Important for attributing usage to a given billing period.
114    #[serde(with = "time::serde::rfc3339")]
115    pub timestamp: OffsetDateTime,
116}
117
118/// The value of a property assocaited with an [`Event`].
119#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)]
120#[serde(untagged)]
121pub enum EventPropertyValue {
122    /// A string value.
123    String(String),
124    /// A numeric value.
125    Number(Number),
126    /// A boolean value.
127    Bool(bool),
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
131struct EventFilter<'a> {
132    #[serde(skip_serializing_if = "Option::is_none")]
133    event_ids: Option<&'a [&'a str]>,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    invoice_id: Option<&'a str>,
136    #[serde(with = "time::serde::rfc3339::option")]
137    #[serde(skip_serializing_if = "Option::is_none")]
138    timeframe_start: Option<OffsetDateTime>,
139    #[serde(with = "time::serde::rfc3339::option")]
140    #[serde(skip_serializing_if = "Option::is_none")]
141    timeframe_end: Option<OffsetDateTime>,
142}
143
144/// Parameters for an event search operation.
145#[derive(Debug, Clone)]
146pub struct EventSearchParams<'a> {
147    inner: ListParams,
148    filter: EventFilter<'a>,
149}
150
151impl<'a> Default for EventSearchParams<'a> {
152    fn default() -> EventSearchParams<'a> {
153        EventSearchParams::DEFAULT
154    }
155}
156
157impl<'a> EventSearchParams<'a> {
158    /// The default subscription list parameters.
159    ///
160    /// Exposed as a constant for use in constant evaluation contexts.
161    pub const DEFAULT: EventSearchParams<'static> = EventSearchParams {
162        inner: ListParams::DEFAULT,
163        filter: EventFilter {
164            event_ids: None,
165            invoice_id: None,
166            timeframe_start: None,
167            timeframe_end: None,
168        },
169    };
170
171    /// Sets the page size for the list operation.
172    ///
173    /// See [`ListParams::page_size`].
174    pub const fn page_size(mut self, page_size: u64) -> Self {
175        self.inner = self.inner.page_size(page_size);
176        self
177    }
178
179    /// Filters the search to the specified event IDs.
180    pub const fn event_ids(mut self, filter: &'a [&'a str]) -> Self {
181        self.filter.event_ids = Some(filter);
182        self
183    }
184
185    /// Filters the search to the specified invoice ID.
186    pub const fn invoice_id(mut self, filter: &'a str) -> Self {
187        self.filter.invoice_id = Some(filter);
188        self
189    }
190
191    /// Filters the search to events falling on or after the specified datetime.
192    pub const fn timeframe_start(mut self, start: OffsetDateTime) -> Self {
193        self.filter.timeframe_start = Some(start);
194        self
195    }
196
197    /// Filters the search to events falling before the specified datetime.
198    pub const fn timeframe_end(mut self, end: OffsetDateTime) -> Self {
199        self.filter.timeframe_end = Some(end);
200        self
201    }
202}
203
204impl Client {
205    /// Searches events.
206    pub fn search_events(
207        &self,
208        params: &EventSearchParams,
209    ) -> impl Stream<Item = Result<Event, Error>> + '_ {
210        let req = self.build_request(Method::POST, EVENTS_PATH.chain_one("search"));
211        let req = req.json(&params.filter);
212        self.stream_paginated_request(&params.inner, req)
213    }
214
215    /// Ingests events.
216    pub async fn ingest_events(
217        &self,
218        mode: IngestionMode,
219        events: &[IngestEventRequest<'_>],
220    ) -> Result<IngestEventResponse, Error> {
221        #[derive(Serialize)]
222        struct Envelope<'a> {
223            events: &'a [IngestEventRequest<'a>],
224        }
225
226        let req = self.build_request(Method::POST, ["ingest"]);
227        let req = req.query(&[("debug", matches!(mode, IngestionMode::Debug))]);
228        let req = req.json(&Envelope { events });
229        let res = self.send_request(req).await?;
230        Ok(res)
231    }
232
233    /// Amends an event by ID.
234    pub async fn amend_event(&self, id: &str, event: &AmendEventRequest<'_>) -> Result<(), Error> {
235        let req = self.build_request(Method::PUT, EVENTS_PATH.chain_one(id));
236        let req = req.json(event);
237        let _: serde_json::Value = self.send_request(req).await?;
238        Ok(())
239    }
240
241    /// Deprecates an event by ID.
242    pub async fn deprecate_event(&self, id: &str) -> Result<(), Error> {
243        let req = self.build_request(
244            Method::PUT,
245            EVENTS_PATH.chain_one(id).chain_one("deprecate"),
246        );
247        let _: serde_json::Value = self.send_request(req).await?;
248        Ok(())
249    }
250}