superposition_provider 0.102.0

Open feature provider for Superposition.
Documentation
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use serde_json::{Map, Value};
use superposition_sdk::error::SdkError;
use superposition_sdk::types::DimensionMatchStrategy;
use superposition_sdk::{Client, Config as SdkConfig};

use crate::conversions;
use crate::types::{Result, SuperpositionError, SuperpositionOptions};
use crate::utils::ConversionUtils;

use super::{ConfigData, ExperimentData, FetchResponse, SuperpositionDataSource};

pub struct HttpDataSource {
    options: SuperpositionOptions,
    client: Client,
}

fn create_client(options: &SuperpositionOptions) -> Client {
    let sdk_config = SdkConfig::builder()
        .endpoint_url(&options.endpoint)
        .bearer_token(options.token.clone().into())
        .behavior_version_latest()
        .build();

    Client::from_conf(sdk_config)
}

impl HttpDataSource {
    pub fn new(options: SuperpositionOptions) -> Self {
        Self {
            client: create_client(&options),
            options,
        }
    }

    async fn fetch_experiments_with_filters(
        &self,
        context: Option<Map<String, Value>>,
        prefix_filter: Option<Vec<String>>,
        if_modified_since: Option<DateTime<Utc>>,
        filter: Option<DimensionMatchStrategy>,
    ) -> Result<FetchResponse<ExperimentData>> {
        let mut experiment_builder = self
            .client
            .get_experiment_config()
            .workspace_id(&self.options.workspace_id)
            .org_id(&self.options.org_id);

        if let Some(modified_since) = if_modified_since
            .and_then(|t| t.timestamp_nanos_opt())
            .and_then(|t| aws_smithy_types::DateTime::from_nanos(t.into()).ok())
        {
            experiment_builder = experiment_builder.if_modified_since(modified_since);
        }

        if let Some(context) = context {
            if !context.is_empty() {
                let context = conversions::map_to_hashmap(context);
                experiment_builder = experiment_builder.set_context(Some(context));
            }
        }

        if let Some(prefixes) = prefix_filter {
            if !prefixes.is_empty() {
                experiment_builder = experiment_builder.set_prefix(Some(prefixes));
            }
        }

        if let Some(filter) = filter {
            experiment_builder = experiment_builder.dimension_match_strategy(filter);
        }

        log::info!("Fetching experiments from Superposition service using SDK");
        let experiments_result = experiment_builder.send().await;

        let experiments_response = match experiments_result {
            Ok(res) => {
                let modified_at =
                    Utc.timestamp_nanos(res.last_modified.as_nanos() as i64);
                ConversionUtils::convert_experiment_config_response(res)
                    .map(|d| ExperimentData {
                        data: d,
                        fetched_at: modified_at,
                    })
                    .map(FetchResponse::Data)
            }
            Err(SdkError::ResponseError(r)) if r.raw().status().as_u16() == 304 => {
                Ok(FetchResponse::NotModified)
            }
            Err(e) => Err(SuperpositionError::NetworkError(format!(
                "Failed to list experiments: {}",
                e
            ))),
        }?;

        log::info!("Successfully fetched {}", experiments_response);

        Ok(experiments_response)
    }

    async fn fetch_config_with_filters(
        &self,
        context: Option<Map<String, Value>>,
        prefix_filter: Option<Vec<String>>,
        if_modified_since: Option<DateTime<Utc>>,
    ) -> Result<FetchResponse<ConfigData>> {
        log::info!("Fetching config from Superposition service using SDK");
        let mut builder = self
            .client
            .get_config()
            .workspace_id(&self.options.workspace_id)
            .org_id(&self.options.org_id);

        if let Some(modified_since) = if_modified_since
            .and_then(|t| t.timestamp_nanos_opt())
            .and_then(|t| aws_smithy_types::DateTime::from_nanos(t.into()).ok())
        {
            builder = builder.if_modified_since(modified_since);
        }

        if let Some(context) = context {
            if !context.is_empty() {
                let context = conversions::map_to_hashmap(context);
                builder = builder.set_context(Some(context));
            }
        }

        if let Some(prefixes) = prefix_filter {
            if !prefixes.is_empty() {
                builder = builder.set_prefix(Some(prefixes));
            }
        }

        let config_result = builder.send().await;

        let resp = match config_result {
            Ok(res) => {
                let modified_at =
                    Utc.timestamp_nanos(res.last_modified.as_nanos() as i64);
                ConversionUtils::convert_get_config_response(res)
                    .map(|d| ConfigData {
                        config: d,
                        fetched_at: modified_at,
                    })
                    .map(FetchResponse::Data)
            }
            Err(SdkError::ResponseError(r)) if r.raw().status().as_u16() == 304 => {
                Ok(FetchResponse::NotModified)
            }
            Err(e) => Err(SuperpositionError::NetworkError(format!(
                "Failed to fetch config: {}",
                e
            ))),
        };

        resp
    }
}

#[async_trait]
impl SuperpositionDataSource for HttpDataSource {
    async fn fetch_config(
        &self,
        if_modified_since: Option<DateTime<Utc>>,
    ) -> Result<FetchResponse<ConfigData>> {
        self.fetch_config_with_filters(None, None, if_modified_since)
            .await
    }

    async fn fetch_filtered_config(
        &self,
        context: Option<Map<String, Value>>,
        prefix_filter: Option<Vec<String>>,
        if_modified_since: Option<DateTime<Utc>>,
    ) -> Result<FetchResponse<ConfigData>> {
        self.fetch_config_with_filters(context, prefix_filter, if_modified_since)
            .await
    }

    async fn fetch_active_experiments(
        &self,
        if_modified_since: Option<DateTime<Utc>>,
    ) -> Result<FetchResponse<ExperimentData>> {
        self.fetch_experiments_with_filters(None, None, if_modified_since, None)
            .await
    }

    async fn fetch_candidate_active_experiments(
        &self,
        context: Option<Map<String, Value>>,
        prefix_filter: Option<Vec<String>>,
        if_modified_since: Option<DateTime<Utc>>,
    ) -> Result<FetchResponse<ExperimentData>> {
        self.fetch_experiments_with_filters(
            context,
            prefix_filter,
            if_modified_since,
            Some(DimensionMatchStrategy::Exact),
        )
        .await
    }

    async fn fetch_matching_active_experiments(
        &self,
        context: Option<Map<String, Value>>,
        prefix_filter: Option<Vec<String>>,
        if_modified_since: Option<DateTime<Utc>>,
    ) -> Result<FetchResponse<ExperimentData>> {
        self.fetch_experiments_with_filters(
            context,
            prefix_filter,
            if_modified_since,
            Some(DimensionMatchStrategy::Subset),
        )
        .await
    }

    fn supports_experiments(&self) -> bool {
        true
    }

    async fn close(&self) -> Result<()> {
        Ok(())
    }
}