otlp2pipeline 0.2.0

OTLP ingestion worker for Cloudflare Pipelines and AWS
Documentation
use anyhow::Result;
use serde::{Deserialize, Serialize};

use super::CloudflareClient;

#[derive(Serialize)]
struct CreateBucketRequest<'a> {
    name: &'a str,
}

/// CORS rule for R2 bucket (matches Cloudflare API format)
#[derive(Serialize)]
pub struct CorsRule {
    pub allowed: CorsAllowed,
    #[serde(rename = "maxAgeSeconds")]
    pub max_age_seconds: u32,
}

#[derive(Serialize)]
pub struct CorsAllowed {
    pub origins: Vec<String>,
    pub methods: Vec<String>,
    pub headers: Vec<String>,
}

#[derive(Serialize)]
struct CorsConfig {
    rules: Vec<CorsRule>,
}

#[derive(Serialize)]
struct SetCredentialRequest<'a> {
    token: &'a str,
}

#[derive(Serialize)]
struct MaintenanceConfig {
    compaction: CompactionConfig,
    snapshot_expiration: SnapshotExpirationConfig,
}

#[derive(Serialize)]
struct CompactionConfig {
    state: &'static str,
}

#[derive(Serialize)]
struct SnapshotExpirationConfig {
    state: &'static str,
    max_snapshot_age: &'static str,
    min_snapshots_to_keep: u32,
}

#[derive(Deserialize)]
pub struct Bucket {
    pub name: String,
}

impl CloudflareClient {
    /// Create an R2 bucket
    pub async fn create_bucket(&self, name: &str) -> Result<Option<Bucket>> {
        self.post_idempotent("/r2/buckets", &CreateBucketRequest { name })
            .await
    }

    /// Delete an R2 bucket
    pub async fn delete_bucket(&self, name: &str) -> Result<()> {
        self.delete(&format!("/r2/buckets/{}", name)).await
    }

    /// Set CORS configuration for an R2 bucket
    pub async fn set_bucket_cors(&self, bucket: &str, rules: Vec<CorsRule>) -> Result<()> {
        self.put_void(
            &format!("/r2/buckets/{}/cors", bucket),
            &CorsConfig { rules },
        )
        .await
    }

    /// Enable R2 Data Catalog for a bucket
    pub async fn enable_catalog(&self, bucket: &str) -> Result<()> {
        self.post_void(
            &format!("/r2-catalog/{}/enable", bucket),
            &serde_json::json!({}),
        )
        .await
    }

    /// Set service credential for catalog maintenance
    pub async fn set_catalog_credential(&self, bucket: &str, token: &str) -> Result<()> {
        self.post_void(
            &format!("/r2-catalog/{}/credential", bucket),
            &SetCredentialRequest { token },
        )
        .await
    }

    /// Configure catalog maintenance (compaction + snapshot expiration)
    pub async fn configure_catalog_maintenance(&self, bucket: &str) -> Result<()> {
        self.post_void(
            &format!("/r2-catalog/{}/maintenance-configs", bucket),
            &MaintenanceConfig {
                compaction: CompactionConfig { state: "enabled" },
                snapshot_expiration: SnapshotExpirationConfig {
                    state: "enabled",
                    max_snapshot_age: "1d",
                    min_snapshots_to_keep: 1,
                },
            },
        )
        .await
    }
}