netidx-archive 0.32.0

netidx archive file format
Documentation
use crate::{
    config::{ConfigBuilder, PublishConfigBuilder, RecordConfigBuilder},
    logfile::{BatchItem, Seek, BATCH_POOL},
    recorder::{Recorder, State},
    recorder_client::{Client, Speed},
};
use anyhow::{Context, Result};
use arcstr::literal;
use chrono::prelude::*;
use futures::{
    channel::{mpsc, oneshot},
    future,
    prelude::*,
};
use netidx::{
    config::{
        file::{self as client, Auth as CAuth},
        Config as CConfig, DefaultAuthMech,
    },
    path::Path,
    publisher::{Publisher, PublisherBuilder, Value},
    resolver_server::{
        config::{
            file::{self as server, Auth as SAuth},
            Config as SConfig,
        },
        Server,
    },
    subscriber::{Event, Subscriber, SubscriberBuilder, UpdatesFlags},
};
use netidx_netproto::glob::GlobSet;
use std::{fs, time::Duration};
use tokio::{task, time};

struct Ctx {
    publisher: Publisher,
    subscriber: Subscriber,
    _server: Server,
}

impl Ctx {
    async fn new() -> Result<Self> {
        let server_cfg = server::ConfigBuilder::default()
            .member_servers(vec![server::MemberServerBuilder::default()
                .addr("127.0.0.1:0".parse().context("parsing addr")?)
                .bind_addr("127.0.0.1".parse().context("parsing ip addr")?)
                .auth(SAuth::Anonymous)
                .build()
                .context("member server config")?])
            .build()
            .context("server config")?;
        let server_cfg =
            SConfig::from_file(server_cfg).context("validating server config")?;
        let server =
            Server::new(server_cfg, false, 0).await.context("starting server")?;
        let client_cfg = client::ConfigBuilder::default()
            .addrs(vec![(*server.local_addr(), CAuth::Anonymous)])
            .default_auth(DefaultAuthMech::Anonymous)
            .default_bind_config("local")
            .build()
            .context("client config")?;
        let client_cfg =
            CConfig::from_file(client_cfg).context("validating client config")?;
        let publisher = PublisherBuilder::new(client_cfg.clone())
            .build()
            .await
            .context("building publisher")?;
        let subscriber = SubscriberBuilder::new(client_cfg.clone())
            .build()
            .context("building subscriber")?;
        Ok(Self { _server: server, publisher, subscriber })
    }
}

const PATH0: &str = "test-recorder";
const PATH1: &str = "test-embedded-recorder";
const SHARD: &str = "0";
const BASE0: &str = "/recorder";
const BASE1: &str = "/embedded-recorder";
const D0: &str = "/test/d0";
const D1: &str = "/test/d1";
const N: u64 = 1000;

#[tokio::test(flavor = "multi_thread")]
async fn recorder() -> Result<()> {
    let _ = env_logger::try_init();
    let _ = fs::remove_dir_all(PATH0);
    eprintln!("creating context");
    let ctx = Ctx::new().await.context("build creating context")?;
    eprintln!("context created");
    let record = RecordConfigBuilder::default()
        .try_spec(vec![literal!("/test/**")])
        .context("compiling spec")?
        .build()
        .context("build record config")?;
    let publish = PublishConfigBuilder::default()
        .bind("local".parse()?)
        .base(BASE0)
        .build()
        .context("publish config")?;
    let cfg = ConfigBuilder::default()
        .record([(SHARD.into(), record)])
        .publish(publish)
        .archive_directory(PATH0)
        .build()
        .context("build config")?;
    let (pub_tx, pub_rx) = oneshot::channel();
    let (fin_tx, fin_rx) = oneshot::channel();
    let publish = || {
        let publisher = ctx.publisher.clone();
        let ids = [
            publisher.publish(Path::from(D0), Value::Null)?,
            publisher.publish(Path::from(D1), Value::Null)?,
        ];
        Ok::<_, anyhow::Error>(async move {
            eprintln!("starting publisher task");
            publisher.flushed().await;
            eprintln!("paths published");
            let _ = pub_tx.send(());
            eprintln!("publisher waiting for clients");
            future::join_all(ids.iter().map(|pb| publisher.wait_client(pb.id()))).await;
            eprintln!("publisher got clients");
            for i in 0..N {
                let mut b = publisher.start_batch();
                for id in &ids {
                    id.update(&mut b, i)
                }
                b.commit(None).await;
                time::sleep(Duration::from_millis(1)).await;
            }
            let _ = fin_tx.send(());
        })
    };
    task::spawn(publish()?);
    eprintln!("waiting for publisher to start");
    pub_rx.await.map_err(|_| anyhow!("publish task died"))?;
    eprintln!("publisher started, starting recorder");
    let recorder = Recorder::start_with(
        cfg,
        Some(ctx.publisher.clone()),
        Some(ctx.subscriber.clone()),
    )
    .await
    .context("creating recorder")?;
    eprintln!("recorder started");
    let client = Client::new(&ctx.subscriber, &Path::from(BASE0))
        .context("create record client")?;
    fin_rx.await.map_err(|_| anyhow!("publish task died"))?;
    let session = client
        .session()
        .pos(Seek::Beginning)
        .context("set pos")?
        .speed(Speed::Unlimited)
        .build()
        .await
        .context("build session")?;
    eprintln!("recorder client session started");
    let (tx_up, mut rx_up) = mpsc::channel(10);
    let tx_up = [(UpdatesFlags::empty(), tx_up)];
    let (d0, d1) = (
        ctx.subscriber.subscribe_updates(session.data_path().append(D0), tx_up.clone()),
        ctx.subscriber.subscribe_updates(session.data_path().append(D1), tx_up),
    );
    eprintln!("waiting for initial subscription");
    future::join_all([d0.wait_subscribed(), d1.wait_subscribed()]).await;
    eprintln!("initial subscription complete, pressing play");
    session.set_state(State::Play).await.context("pressing play")?;
    let mut i = 0;
    while let Some(up) = rx_up.next().await {
        eprintln!("batch: {i} {up:?}");
        if i < 2 {
            assert_eq!(up.len(), 2);
            assert_eq!(up[0].1, Event::Update(Value::Null));
            assert_eq!(up[1].1, Event::Update(Value::Null));
        } else if i < 1002 {
            assert_eq!(up.len(), 2);
            assert_eq!(up[0].0, d0.id());
            assert_eq!(up[1].0, d1.id());
            assert_eq!(up[0].1, Event::Update(Value::U64(i - 2)));
            assert_eq!(up[1].1, Event::Update(Value::U64(i - 2)));
        } else if i == 1002 {
            assert_eq!(up.len(), 2);
            assert_eq!(up[0].1, Event::Update(Value::Null));
            assert_eq!(up[1].1, Event::Update(Value::Null));
            break;
        } else {
            panic!("unexpected number of batches")
        }
        i += 1;
    }
    drop(recorder);
    fs::remove_dir_all(PATH0)?;
    Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn embedded_recorder() -> Result<()> {
    let _ = env_logger::try_init();
    let _ = fs::remove_dir_all(PATH1);
    eprintln!("creating context");
    let ctx = Ctx::new().await.context("build creating context")?;
    eprintln!("context created");
    let record = RecordConfigBuilder::default()
        .spec(GlobSet::new(true, [])?)
        .build()
        .context("build record config")?;
    let publish = PublishConfigBuilder::default()
        .bind("local".parse()?)
        .base(BASE1)
        .build()
        .context("publish config")?;
    let cfg = ConfigBuilder::default()
        .record([(SHARD.into(), record)])
        .publish(publish)
        .archive_directory(PATH1)
        .build()
        .context("build config")?;
    eprintln!("starting recorder");
    let recorder = Recorder::start_with(
        cfg,
        Some(ctx.publisher.clone()),
        Some(ctx.subscriber.clone()),
    )
    .await
    .context("start recorder")?;
    eprintln!("recorder started");
    let mut archive = recorder
        .shards
        .writers
        .lock()
        .remove(&recorder.shards.by_name[SHARD])
        .ok_or_else(|| anyhow!("missing writer for shard {SHARD}"))?;
    eprintln!("got record shard");
    let (fin_tx, fin_rx) = oneshot::channel();
    task::spawn(async move {
        eprintln!("record task started");
        let paths = [Path::from(D0), Path::from(D1)];
        archive.add_paths(paths.iter()).unwrap();
        let ids = [
            archive.id_for_path(&paths[0]).unwrap(),
            archive.id_for_path(&paths[1]).unwrap(),
        ];
        let mut batch = BATCH_POOL.take();
        for i in 0..N {
            batch.clear();
            for id in &ids {
                batch.push(BatchItem(*id, Event::Update(Value::U64(i))));
            }
            archive.add_batch(false, Utc::now(), &batch).unwrap();
            task::block_in_place(|| archive.flush_all().unwrap());
        }
        eprintln!("record task finished");
        let _ = fin_tx.send(());
    });
    eprintln!("connecting to the recorder");
    let client = Client::new(&ctx.subscriber, &Path::from(BASE1))
        .context("create record client")?;
    eprintln!("connected; waiting for write task to finish");
    fin_rx.await.map_err(|_| anyhow!("publish task died"))?;
    eprintln!("starting session");
    let session = client
        .session()
        .pos(Seek::Beginning)
        .context("set pos")?
        .speed(Speed::Unlimited)
        .build()
        .await
        .context("build session")?;
    eprintln!("recorder client session started");
    let (tx_up, mut rx_up) = mpsc::channel(10);
    let tx_up = [(UpdatesFlags::empty(), tx_up)];
    let (d0, d1) = (
        ctx.subscriber.subscribe_updates(session.data_path().append(D0), tx_up.clone()),
        ctx.subscriber.subscribe_updates(session.data_path().append(D1), tx_up),
    );
    eprintln!("waiting for initial subscription");
    future::join_all([d0.wait_subscribed(), d1.wait_subscribed()]).await;
    eprintln!("initial subscription complete, pressing play");
    session.set_state(State::Play).await.context("pressing play")?;
    let mut i = 0;
    while let Some(up) = rx_up.next().await {
        eprintln!("batch {i}: {up:?}");
        if i < 1 {
            assert_eq!(up.len(), 2);
            assert_eq!(up[0].1, Event::Update(Value::Null));
            assert_eq!(up[1].1, Event::Update(Value::Null));
        } else if i < 1001 {
            assert_eq!(up.len(), 2);
            assert_eq!(up[0].0, d0.id());
            assert_eq!(up[1].0, d1.id());
            assert_eq!(up[0].1, Event::Update(Value::U64(i - 1)));
            assert_eq!(up[1].1, Event::Update(Value::U64(i - 1)));
        }
        if i == 1000 {
            break;
        }
        i += 1;
    }
    drop(recorder);
    fs::remove_dir_all(PATH1)?;
    Ok(())
}