ax_core 0.3.2

Core library implementing the functions of ax
Documentation
use ax_core::{
    swarm::BanyanStore,
    trees::query::{LamportQuery, TagExprQuery, TimeQuery},
};
use ax_types::{app_id, tags, AppId, Payload};
use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct MyEvent {
    things_are_happening: Vec<String>,
}

#[tokio::test(flavor = "multi_thread")]
async fn banyan_multi_node() {
    ax_core::util::setup_logger();
    let s1 = BanyanStore::test("a").await.unwrap();
    let s2 = BanyanStore::test("b").await.unwrap();
    s1.ipfs()
        .clone()
        .add_address(s2.ipfs().local_peer_id(), s2.ipfs().listeners()[0].clone());

    fn app_id() -> AppId {
        app_id!("test")
    }

    let tags = tags!("event");
    let query = TagExprQuery::new(vec![tags.clone().into()], LamportQuery::all(), TimeQuery::all());

    let event = MyEvent {
        things_are_happening: vec!["hello world".to_string()],
    };

    let payload = Payload::compact(&event).unwrap();
    let handle = tokio::spawn(async move {
        let mut stream = s2.stream_known_streams();
        let stream_nr = stream.next().await.unwrap();
        tracing::info!("known: {}", stream_nr);
        let mut stream = s2.stream_filtered_stream_ordered(query);
        let (i1, k1, e1) = stream.next().await.unwrap().unwrap();
        tracing::info!("{:?}", k1);
        assert_eq!(i1, 4);
        assert_eq!(e1, payload);
        let (i2, k2, e2) = stream.next().await.unwrap().unwrap();
        tracing::info!("{:?}", k2);
        assert_eq!(i2, 5);
        assert_eq!(e2, payload);
    });

    s1.append(app_id(), vec![(tags.clone(), Payload::compact(&event).unwrap())])
        .await
        .unwrap();
    s1.append(app_id(), vec![(tags, Payload::compact(&event).unwrap())])
        .await
        .unwrap();

    handle.await.unwrap();
}