couchbase-core 1.0.1

Couchbase SDK core networking and protocol implementation, not intended for direct use
Documentation
/*
 *
 *  * Copyright (c) 2025 Couchbase, Inc.
 *  *
 *  * Licensed under the Apache License, Version 2.0 (the "License");
 *  * you may not use this file except in compliance with the License.
 *  * You may obtain a copy of the License at
 *  *
 *  *    http://www.apache.org/licenses/LICENSE-2.0
 *  *
 *  * Unless required by applicable law or agreed to in writing, software
 *  * distributed under the License is distributed on an "AS IS" BASIS,
 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  * See the License for the specific language governing permissions and
 *  * limitations under the License.
 *
 */

use crate::common::node_version::{NodeEdition, NodeVersion};
use crate::common::test_agent::TestAgent;
use couchbase_core::agent::Agent;
use couchbase_core::error::{Error, ErrorKind};
use couchbase_core::features::BucketFeature;
use couchbase_core::memdx::error::ServerErrorKind;
use couchbase_core::mgmtx::responses::{
    CreateCollectionResponse, CreateScopeResponse, DeleteCollectionResponse, DeleteScopeResponse,
};
use couchbase_core::options::crud::GetCollectionIdOptions;
use couchbase_core::options::management::{
    CreateCollectionOptions, CreateScopeOptions, DeleteCollectionOptions, DeleteScopeOptions,
    EnsureManifestOptions,
};
use couchbase_core::{error, memdx};
use rand::distr::Alphanumeric;
use rand::{rng, Rng, RngExt};
use std::ops::Add;
use std::time::Duration;
use tokio::time::{timeout_at, Instant};

pub fn generate_key() -> Vec<u8> {
    generate_string_key().into_bytes()
}

pub fn generate_key_with_letter_prefix() -> String {
    let mut name = generate_string_key();
    loop {
        if !name.as_bytes()[0].is_ascii_alphabetic() {
            name = name[1..].to_string();
        } else {
            break;
        }
    }

    name
}

pub fn generate_string_key() -> String {
    rng()
        .sample_iter(&Alphanumeric)
        .take(30)
        .map(char::from)
        .collect::<String>()
}

pub fn generate_bytes_value(len: usize) -> Vec<u8> {
    rng()
        .sample_iter(&Alphanumeric)
        .take(len)
        .map(char::from)
        .collect::<String>()
        .into_bytes()
}

pub fn generate_string_value(len: usize) -> String {
    rng()
        .sample_iter(&Alphanumeric)
        .take(len)
        .map(char::from)
        .collect::<String>()
}

pub async fn create_collection_and_wait_for_kv(
    agent: &Agent,
    bucket_name: &str,
    scope_name: &str,
    collection_name: &str,
    deadline: Instant,
) {
    agent
        .create_collection(&CreateCollectionOptions::new(
            bucket_name,
            scope_name,
            collection_name,
        ))
        .await
        .unwrap();

    let fut = || async {
        loop {
            let resp = agent
                .get_collection_id(GetCollectionIdOptions::new(scope_name, collection_name))
                .await;
            if resp.is_ok() {
                break;
            }

            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    };

    timeout_at(deadline, fut()).await.unwrap();
}

pub async fn delete_collection_and_wait_for_kv(
    agent: &Agent,
    bucket_name: &str,
    scope_name: &str,
    collection_name: &str,
    deadline: Instant,
) {
    agent
        .delete_collection(&DeleteCollectionOptions::new(
            bucket_name,
            scope_name,
            collection_name,
        ))
        .await
        .unwrap();

    let fut = || async {
        loop {
            let resp = agent
                .get_collection_id(GetCollectionIdOptions::new(scope_name, collection_name))
                .await;
            if let Some(e) = resp.err() {
                if is_memdx_error(&e)
                    .unwrap()
                    .is_server_error_kind(ServerErrorKind::UnknownCollectionName)
                {
                    break;
                }
            }

            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    };

    timeout_at(deadline, fut()).await.unwrap();
}

pub async fn delete_scope_and_wait_for_kv(
    agent: &Agent,
    bucket_name: &str,
    scope_name: &str,
    collection_name: &str,
    deadline: Instant,
) {
    agent
        .delete_scope(&DeleteScopeOptions::new(bucket_name, scope_name))
        .await
        .unwrap();

    let fut = || async {
        loop {
            let resp = agent
                .get_collection_id(GetCollectionIdOptions::new(scope_name, collection_name))
                .await;
            if let Some(e) = resp.err() {
                if is_memdx_error(&e)
                    .unwrap()
                    .is_server_error_kind(ServerErrorKind::UnknownScopeName)
                {
                    break;
                }
            }

            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    };

    timeout_at(deadline, fut()).await.unwrap();
}

pub fn is_memdx_error(e: &Error) -> Option<&memdx::error::Error> {
    match e.kind() {
        ErrorKind::Memdx(err, ..) => Some(err),
        _ => None,
    }
}

pub async fn delete_scope(
    agent: &TestAgent,
    bucket_name: &str,
    scope_name: &str,
) -> error::Result<DeleteScopeResponse> {
    let opts = &DeleteScopeOptions::new(bucket_name, scope_name);
    agent.delete_scope(opts).await
}

pub async fn delete_collection(
    agent: &TestAgent,
    bucket_name: &str,
    scope_name: &str,
    collection_name: &str,
) -> error::Result<DeleteCollectionResponse> {
    let opts = &DeleteCollectionOptions::new(bucket_name, scope_name, collection_name);
    agent.delete_collection(opts).await
}

pub async fn create_scope(
    agent: &TestAgent,
    bucket_name: &str,
    scope_name: &str,
) -> error::Result<CreateScopeResponse> {
    let opts = &CreateScopeOptions::new(bucket_name, scope_name);
    agent.create_scope(opts).await
}

pub async fn create_scope_and_ensure_exists(
    agent: &TestAgent,
    bucket_name: &str,
    scope_name: &str,
) {
    let res = create_scope(agent, bucket_name, scope_name).await.unwrap();

    agent
        .ensure_manifest(&EnsureManifestOptions::new(
            bucket_name,
            u64::from_str_radix(&res.manifest_uid, 16).unwrap(),
        ))
        .await
        .unwrap();
}

pub async fn create_collection(
    agent: &TestAgent,
    bucket_name: &str,
    scope_name: &str,
    collection_name: &str,
) -> error::Result<CreateCollectionResponse> {
    let opts = &CreateCollectionOptions::new(bucket_name, scope_name, collection_name);
    agent.create_collection(opts).await
}

pub async fn create_collection_and_ensure_exists(
    agent: &TestAgent,
    bucket_name: &str,
    scope_name: &str,
    collection_name: &str,
) {
    let res = create_collection(agent, bucket_name, scope_name, collection_name)
        .await
        .unwrap();

    agent
        .ensure_manifest(&EnsureManifestOptions::new(
            bucket_name,
            u64::from_str_radix(&res.manifest_uid, 16).unwrap(),
        ))
        .await
        .unwrap();
}

pub async fn feature_supported(agent: &TestAgent, feature: BucketFeature) -> bool {
    agent.bucket_features().await.unwrap().contains(&feature)
}

pub async fn try_until<Fut, T>(
    deadline: Instant,
    sleep: Duration,
    fail_msg: impl AsRef<str>,
    mut f: impl FnMut() -> Fut,
) -> T
where
    Fut: std::future::Future<Output = Result<Option<T>, Error>>,
{
    while Instant::now() < deadline {
        let res = f().await;
        if let Ok(Some(r)) = res {
            return r;
        }

        if let Err(e) = res {
            tracing::error!("{e}");
        }

        tokio::time::sleep(sleep).await;
    }
    panic!("{}", fail_msg.as_ref());
}

pub async fn run_with_deadline<Resp, Fut>(deadline: Instant, f: Fut) -> Result<Resp, Error>
where
    Fut: std::future::Future<Output = Result<Resp, Error>>,
{
    timeout_at(deadline, f).await.unwrap()
}

pub async fn run_with_std_kv_deadline<Resp, Fut>(
    node_version: &NodeVersion,
    f: Fut,
) -> Result<Resp, Error>
where
    Fut: std::future::Future<Output = Result<Resp, Error>>,
{
    let timeout = match node_version.edition {
        Some(NodeEdition::Community) => Duration::from_millis(10000),
        _ => Duration::from_millis(2500),
    };

    timeout_at(Instant::now().add(timeout), f).await.unwrap()
}

pub async fn run_with_std_mgmt_deadline<Resp, Fut>(f: Fut) -> Result<Resp, Error>
where
    Fut: std::future::Future<Output = Result<Resp, Error>>,
{
    timeout_at(Instant::now().add(Duration::from_millis(10000)), f)
        .await
        .unwrap()
}

pub async fn run_with_std_query_deadline<Resp, Fut>(f: Fut) -> Result<Resp, Error>
where
    Fut: std::future::Future<Output = Result<Resp, Error>>,
{
    timeout_at(Instant::now().add(Duration::from_millis(10000)), f)
        .await
        .unwrap()
}

pub async fn run_with_std_search_deadline<Resp, Fut>(f: Fut) -> Result<Resp, Error>
where
    Fut: std::future::Future<Output = Result<Resp, Error>>,
{
    timeout_at(Instant::now().add(Duration::from_millis(10000)), f)
        .await
        .unwrap()
}

pub async fn run_with_std_ensure_deadline<Resp, Fut>(f: Fut) -> Result<Resp, Error>
where
    Fut: std::future::Future<Output = Result<Resp, Error>>,
{
    timeout_at(Instant::now().add(Duration::from_millis(30000)), f)
        .await
        .unwrap()
}

pub async fn ensure_manifest(agent: &Agent, bucket_name: &str, manifest_uid: String) {
    let ensure_opts =
        &EnsureManifestOptions::new(bucket_name, u64::from_str_radix(&manifest_uid, 16).unwrap());

    agent.ensure_manifest(ensure_opts).await.unwrap();
}