nats 0.26.0

A Rust NATS client (deprecated — use async-nats instead)
Documentation
// Copyright 2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![cfg(feature = "unstable")]

use rand::prelude::*;
use std::io::Read;

#[test]
fn object_random() {
    let server = nats_server::run_server("tests/configs/jetstream.conf");
    let client = nats::connect(server.client_url()).unwrap();
    let context = nats::jetstream::new(client);

    let bucket = context
        .create_object_store(&nats::object_store::Config {
            bucket: "OBJECTS".to_string(),
            ..Default::default()
        })
        .unwrap();

    bucket.info("FOO").unwrap_err();

    let mut rng = rand::thread_rng();
    let mut bytes = vec![0; 1024 * 1024 + 22];
    rng.try_fill_bytes(&mut bytes).unwrap();

    bucket.put("FOO", &mut bytes.as_slice()).unwrap();

    let object_info = bucket.info("FOO").unwrap();
    assert!(!object_info.nuid.is_empty());
    assert_eq!(object_info.size, bytes.len());

    let mut result = Vec::new();
    bucket.get("FOO").unwrap().read_to_end(&mut result).unwrap();

    assert_eq!(result, bytes);

    let mut result = Vec::new();
    bucket.get("FOO").unwrap().read_to_end(&mut result).unwrap();

    assert_eq!(result, bytes);

    let mut bytes = vec![0; 1024 * 1024 + 22];
    rng.try_fill_bytes(&mut bytes).unwrap();
    bucket.put("FOO", &mut bytes.as_slice()).unwrap();

    let mut result = Vec::new();
    let mut object = bucket.get("FOO").unwrap();
    loop {
        let mut buffer = [0; 1];
        if let Ok(n) = object.read(&mut buffer) {
            if n == 0 {
                break;
            }

            result.extend_from_slice(&buffer[..n]);
        }
    }

    assert_eq!(result, bytes);

    let mut result = Vec::new();
    let mut object = bucket.get("FOO").unwrap();
    loop {
        let mut buffer = [0; 2];
        if let Ok(n) = object.read(&mut buffer) {
            if n == 0 {
                break;
            }

            result.extend_from_slice(&buffer[..n]);
        }
    }

    assert_eq!(result, bytes);

    let mut result = Vec::new();
    let mut object = bucket.get("FOO").unwrap();
    loop {
        let mut buffer = [0; 128];
        if let Ok(n) = object.read(&mut buffer) {
            if n == 0 {
                break;
            }

            result.extend_from_slice(&buffer[..n]);
        }
    }

    assert_eq!(result, bytes);
}

#[test]
fn object_sealed() {
    let server = nats_server::run_server("tests/configs/jetstream.conf");
    let client = nats::connect(server.client_url()).unwrap();
    let context = nats::jetstream::new(client);

    let bucket = context
        .create_object_store(&nats::object_store::Config {
            bucket: "OBJECTS".to_string(),
            ..Default::default()
        })
        .unwrap();

    bucket.seal().unwrap();

    let stream_info = context.stream_info("OBJ_OBJECTS").unwrap();
    assert!(stream_info.config.sealed);
}

#[test]
fn object_delete() {
    let server = nats_server::run_server("tests/configs/jetstream.conf");
    let client = nats::connect(server.client_url()).unwrap();
    let context = nats::jetstream::new(client);

    let bucket = context
        .create_object_store(&nats::object_store::Config {
            bucket: "OBJECTS".to_string(),
            ..Default::default()
        })
        .unwrap();

    let data = b"abc".repeat(100).to_vec();
    bucket.put("A", &mut data.as_slice()).unwrap();
    bucket.delete("A").unwrap();

    let stream_info = context.stream_info("OBJ_OBJECTS").unwrap();

    // We should have one message left. The delete marker.
    assert_eq!(stream_info.state.messages, 1);

    // Make sure we have a delete marker, this will be there to drive Watch functionality.
    let object_info = bucket.info("A").unwrap();
    assert!(object_info.deleted);
}

#[test]
fn object_multiple_delete() {
    let server = nats_server::run_server("tests/configs/jetstream.conf");
    let client = nats::connect(server.client_url()).unwrap();
    let context = nats::jetstream::new(client);

    let bucket = context
        .create_object_store(&nats::object_store::Config {
            bucket: "2OD".to_string(),
            ..Default::default()
        })
        .unwrap();

    let data = b"A".repeat(100).to_vec();
    bucket.put("A", &mut data.as_slice()).unwrap();

    // Hold onto this so we can make sure delete clears all messages, chunks and meta.
    let old_stream_info = context.stream_info("OBJ_2OD").unwrap();

    let data = b"B".repeat(200).to_vec();
    bucket.put("B", &mut data.as_slice()).unwrap();

    // Now delete B
    bucket.delete("B").unwrap();

    let new_stream_info = context.stream_info("OBJ_2OD").unwrap();
    assert_eq!(
        new_stream_info.state.messages,
        old_stream_info.state.messages + 1
    );
}

#[test]
fn object_names() {
    let server = nats_server::run_server("tests/configs/jetstream.conf");
    let client = nats::connect(server.client_url()).unwrap();
    let context = nats::jetstream::new(client);

    let bucket = context
        .create_object_store(&nats::object_store::Config {
            bucket: "NAMES".to_string(),
            ..Default::default()
        })
        .unwrap();

    let empty = Vec::new();

    // Test filename like naming.
    bucket.put("foo.bar", &mut empty.as_slice()).unwrap();

    // Spaces ok
    bucket.put("foo bar", &mut empty.as_slice()).unwrap();

    // Errors
    bucket.put("*", &mut empty.as_slice()).unwrap();
    bucket.put(">", &mut empty.as_slice()).unwrap();
    bucket.put("", &mut empty.as_slice()).unwrap_err();
}

#[test]
fn object_info_modified() {
    let server = nats_server::run_server("tests/configs/jetstream.conf");
    let client = nats::connect(server.client_url()).unwrap();
    let context = nats::jetstream::new(client);

    let bucket = context
        .create_object_store(&nats::object_store::Config {
            bucket: "NAMES".to_string(),
            ..Default::default()
        })
        .unwrap();

    let empty = Vec::new();

    bucket.put("foo.bar", &mut empty.as_slice()).unwrap();

    let info = bucket.info("foo.bar").unwrap();

    // Make sure that modified is set
    assert!(info.modified.unix_timestamp_nanos() > 0);
}

#[test]
fn object_watch() {
    let server = nats_server::run_server("tests/configs/jetstream.conf");
    let client = nats::connect(server.client_url()).unwrap();
    let context = nats::jetstream::new(client);

    let bucket = context
        .create_object_store(&nats::object_store::Config {
            bucket: "WATCH".to_string(),
            ..Default::default()
        })
        .unwrap();

    let mut watch = bucket.watch().unwrap();

    let bytes = vec![];
    bucket.put("foo", &mut bytes.as_slice()).unwrap();

    let info = watch.next().unwrap();
    assert_eq!(info.name, "foo");
    assert_eq!(info.size, bytes.len());

    let bytes = vec![1, 2, 3, 4, 5];
    bucket.put("bar", &mut bytes.as_slice()).unwrap();

    let info = watch.next().unwrap();
    assert_eq!(info.name, "bar");
    assert_eq!(info.size, bytes.len());
}

#[test]
fn object_digest() {
    use std::io::Read;
    let server = nats_server::run_server("tests/configs/jetstream.conf");
    let client = nats::connect(server.client_url()).unwrap();

    let jetstream = nats::jetstream::new(client);

    let bucket = jetstream
        .create_object_store(&nats::object_store::Config {
            bucket: "bucket".to_string(),
            ..Default::default()
        })
        .unwrap();

    let cases = vec![
        (
            "tests/configs/digests/digester_test_bytes_000100.txt",
            "IdgP4UYMGt47rgecOqFoLrd24AXukHf5-SVzqQ5Psg8=",
        ),
        (
            "tests/configs/digests/digester_test_bytes_001000.txt",
            "DZj4RnBpuEukzFIY0ueZ-xjnHY4Rt9XWn4Dh8nkNfnI=",
        ),
        (
            "tests/configs/digests/digester_test_bytes_010000.txt",
            "RgaJ-VSJtjNvgXcujCKIvaheiX_6GRCcfdRYnAcVy38=",
        ),
        (
            "tests/configs/digests/digester_test_bytes_100000.txt",
            "yan7pwBVnC1yORqqgBfd64_qAw6q9fNA60_KRiMMooE=",
        ),
    ];

    for (filename, digest) in cases {
        let file = std::fs::read(filename).unwrap();

        bucket.put(filename, &mut file.as_slice()).unwrap();

        let mut object = bucket.get(filename).unwrap();
        assert_eq!(object.info.digest, format!("SHA-256={digest}"));

        let mut result = Vec::new();
        object.read_to_end(&mut result).unwrap();
        assert_eq!(result, file);
    }
}