centraldogma 0.1.2

CentralDogma client for Rust
Documentation
#[macro_use]
mod utils;

use cd::{
    model::{Change, ChangeContent, CommitMessage, EntryContent, Query, Revision},
    ContentService, ProjectService, RepoService, WatchService,
};
use centraldogma as cd;

use std::{pin::Pin, time::Duration};

use anyhow::{ensure, Context, Result};
use futures::{
    future::{Future, FutureExt},
    StreamExt,
};
use serde_json::json;

struct TestContext {
    client: cd::Client,
    project: cd::model::Project,
    repo: cd::model::Repository,
}

async fn run_test<T>(test: T)
where
    for<'a> T: FnOnce(&'a mut TestContext) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>>,
{
    let mut ctx = setup().await.expect("Failed to setup for test");

    let result = test(&mut ctx).await;

    teardown(ctx).await.expect("Failed to teardown test setup");

    result.unwrap();
}

async fn setup() -> Result<TestContext> {
    let client = cd::Client::new("http://localhost:36462", None)
        .await
        .context("Failed to create client")?;
    let projects = client
        .list_projects()
        .await
        .context("Failed to list projects")?;
    assert_eq!(0, projects.len());

    let prj_name = "TestProject";
    let project = client
        .create_project(prj_name)
        .await
        .context("Failed to create new project")?;

    let repo_name = "TestRepo";
    let repo = client
        .project(prj_name)
        .create_repo(repo_name)
        .await
        .context("Failed to create new repository")?;

    Ok(TestContext {
        client,
        project,
        repo,
    })
}

async fn teardown(ctx: TestContext) -> Result<()> {
    ctx.client
        .project(&ctx.project.name)
        .remove_repo(&ctx.repo.name)
        .await
        .context("Failed to remove the repo")?;

    ctx.client
        .project(&ctx.project.name)
        .purge_repo(&ctx.repo.name)
        .await
        .context("Failed to remove the repo")?;

    ctx.client
        .remove_project(&ctx.project.name)
        .await
        .context("Failed to remove the project")?;

    ctx.client
        .purge_project(&ctx.project.name)
        .await
        .context("Failed to purge the project")?;

    Ok(())
}

fn watch_file_stream_test<'a>(
    ctx: &'a mut TestContext,
) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>> {
    async move {
        let r = ctx.client.repo(&ctx.project.name, &ctx.repo.name);

        let commit_msg = CommitMessage {
            summary: "File".to_string(),
            detail: None,
        };
        let file_change = vec![Change {
            path: "/a.json".to_string(),
            content: ChangeContent::UpsertJson(json!({"a": "b"})),
        }];

        r.push(Revision::HEAD, commit_msg, file_change)
            .await
            .context(here!("Failed to push file"))?;

        let watch_stream = r
            .watch_file_stream(&Query::of_json("/a.json").unwrap())
            .context(here!("Failed to get file watch stream"))?;

        let new_commit_msg = CommitMessage {
            summary: "change content".to_string(),
            detail: None,
        };
        let new_change = vec![Change {
            path: "/a.json".to_string(),
            content: ChangeContent::UpsertJson(json!({"a": "c"})),
        }];
        let new_push = async move {
            tokio::time::sleep(Duration::from_millis(1)).await;
            r.push(Revision::HEAD, new_commit_msg, new_change).await
        };

        let sleep = tokio::time::sleep(Duration::from_millis(10000));
        futures::pin_mut!(sleep);

        let mut s = watch_stream.take_until(sleep);
        let (wr, _) = tokio::join!(s.next(), new_push);

        println!("Watch result: {:?}", wr);
        ensure!(wr.is_some(), here!("Failed to get initial watch result"));
        let wr = wr.unwrap();

        ensure!(
            wr.entry.path == "/a.json",
            here!("Wrong entry path returned")
        );
        ensure!(
            matches!(wr.entry.content, EntryContent::Json(json) if json == json!({"a": "c"})),
            here!("Wrong entry content returned")
        );

        Ok(())
    }
    .boxed()
}

fn watch_repo_stream_test<'a>(
    ctx: &'a mut TestContext,
) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>> {
    async move {
        let r = ctx.client.repo(&ctx.project.name, &ctx.repo.name);

        let watch_stream = r
            .watch_repo_stream("")
            .context(here!("Failed to get file watch stream"))?;

        let new_commit_msg = CommitMessage {
            summary: "change content".to_string(),
            detail: None,
        };
        let new_change = vec![Change {
            path: "/a.json".to_string(),
            content: ChangeContent::UpsertJson(json!({"a": "c"})),
        }];
        let new_push = async move {
            tokio::time::sleep(Duration::from_millis(10)).await;
            r.push(Revision::HEAD, new_commit_msg, new_change).await
        };

        let sleep = tokio::time::sleep(Duration::from_millis(10000));
        futures::pin_mut!(sleep);

        let mut s = watch_stream.take_until(sleep);
        let (wr, _) = tokio::join!(s.next(), new_push);

        println!("Watch result: {:?}", wr);
        ensure!(wr.is_some(), here!("Failed to get initial watch result"));

        Ok(())
    }
    .boxed()
}

#[cfg(test)]
#[tokio::test]
async fn test_watch() {
    run_test(watch_file_stream_test).await;
    run_test(watch_repo_stream_test).await;
}