use anyhow::Result;
use serde::{Deserialize, Serialize};
use super::CloudflareClient;
#[derive(Serialize)]
struct CreateStreamRequest<'a> {
name: &'a str,
format: Format,
schema: Schema<'a>,
http: HttpConfig,
worker_binding: WorkerBindingConfig,
}
#[derive(Serialize)]
struct Format {
#[serde(rename = "type")]
format_type: &'static str,
}
#[derive(Serialize)]
struct Schema<'a> {
fields: &'a [SchemaField],
}
#[derive(Serialize, Deserialize, Clone)]
pub struct SchemaField {
pub name: String,
#[serde(rename = "type")]
pub field_type: String,
#[serde(default)]
pub required: bool,
}
#[derive(Serialize)]
struct HttpConfig {
enabled: bool,
authentication: bool,
}
#[derive(Serialize)]
struct WorkerBindingConfig {
enabled: bool,
}
#[derive(Deserialize)]
pub struct Stream {
pub id: String,
pub name: String,
pub endpoint: Option<String>,
}
#[derive(Serialize)]
struct CreateSinkRequest<'a> {
name: &'a str,
#[serde(rename = "type")]
sink_type: &'static str,
format: SinkFormat,
config: SinkConfig<'a>,
}
#[derive(Serialize)]
struct SinkFormat {
#[serde(rename = "type")]
format_type: &'static str,
compression: &'static str,
}
#[derive(Serialize)]
struct SinkConfig<'a> {
bucket: &'a str,
namespace: &'static str,
table_name: &'a str,
token: &'a str,
rolling_policy: RollingPolicy,
}
#[derive(Serialize)]
struct RollingPolicy {
interval_seconds: u32,
}
#[derive(Deserialize)]
pub struct Sink {
pub id: String,
pub name: String,
}
#[derive(Serialize)]
struct CreatePipelineRequest<'a> {
name: &'a str,
sql: String,
}
#[derive(Deserialize)]
pub struct Pipeline {
pub id: String,
pub name: String,
pub status: Option<String>,
}
impl CloudflareClient {
pub async fn list_streams(&self) -> Result<Vec<Stream>> {
self.get("/pipelines/v1/streams").await
}
pub async fn create_stream(
&self,
name: &str,
schema: &[SchemaField],
) -> Result<Option<Stream>> {
self.post_idempotent(
"/pipelines/v1/streams",
&CreateStreamRequest {
name,
format: Format {
format_type: "json",
},
schema: Schema { fields: schema },
http: HttpConfig {
enabled: true,
authentication: true,
},
worker_binding: WorkerBindingConfig { enabled: true },
},
)
.await
}
pub async fn delete_stream(&self, id: &str) -> Result<()> {
self.delete(&format!("/pipelines/v1/streams/{}", id)).await
}
pub async fn list_sinks(&self) -> Result<Vec<Sink>> {
self.get("/pipelines/v1/sinks").await
}
pub async fn create_sink(
&self,
name: &str,
bucket: &str,
table_name: &str,
token: &str,
interval_seconds: u32,
) -> Result<Option<Sink>> {
self.post_idempotent(
"/pipelines/v1/sinks",
&CreateSinkRequest {
name,
sink_type: "r2_data_catalog",
format: SinkFormat {
format_type: "parquet",
compression: "zstd",
},
config: SinkConfig {
bucket,
namespace: "default",
table_name,
token,
rolling_policy: RollingPolicy { interval_seconds },
},
},
)
.await
}
pub async fn delete_sink(&self, id: &str) -> Result<()> {
self.delete(&format!("/pipelines/v1/sinks/{}", id)).await
}
pub async fn list_pipelines(&self) -> Result<Vec<Pipeline>> {
self.get("/pipelines/v1/pipelines").await
}
pub async fn create_pipeline(
&self,
name: &str,
stream: &str,
sink: &str,
) -> Result<Option<Pipeline>> {
self.post_idempotent(
"/pipelines/v1/pipelines",
&CreatePipelineRequest {
name,
sql: format!("INSERT INTO {} SELECT * FROM {}", sink, stream),
},
)
.await
}
pub async fn delete_pipeline(&self, id: &str) -> Result<()> {
self.delete(&format!("/pipelines/v1/pipelines/{}", id))
.await
}
}