#![cfg(feature = "unstable")]
use rand::prelude::*;
use std::io::Read;
#[test]
fn object_random() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = nats::connect(server.client_url()).unwrap();
let context = nats::jetstream::new(client);
let bucket = context
.create_object_store(&nats::object_store::Config {
bucket: "OBJECTS".to_string(),
..Default::default()
})
.unwrap();
bucket.info("FOO").unwrap_err();
let mut rng = rand::thread_rng();
let mut bytes = vec![0; 1024 * 1024 + 22];
rng.try_fill_bytes(&mut bytes).unwrap();
bucket.put("FOO", &mut bytes.as_slice()).unwrap();
let object_info = bucket.info("FOO").unwrap();
assert!(!object_info.nuid.is_empty());
assert_eq!(object_info.size, bytes.len());
let mut result = Vec::new();
bucket.get("FOO").unwrap().read_to_end(&mut result).unwrap();
assert_eq!(result, bytes);
let mut result = Vec::new();
bucket.get("FOO").unwrap().read_to_end(&mut result).unwrap();
assert_eq!(result, bytes);
let mut bytes = vec![0; 1024 * 1024 + 22];
rng.try_fill_bytes(&mut bytes).unwrap();
bucket.put("FOO", &mut bytes.as_slice()).unwrap();
let mut result = Vec::new();
let mut object = bucket.get("FOO").unwrap();
loop {
let mut buffer = [0; 1];
if let Ok(n) = object.read(&mut buffer) {
if n == 0 {
break;
}
result.extend_from_slice(&buffer[..n]);
}
}
assert_eq!(result, bytes);
let mut result = Vec::new();
let mut object = bucket.get("FOO").unwrap();
loop {
let mut buffer = [0; 2];
if let Ok(n) = object.read(&mut buffer) {
if n == 0 {
break;
}
result.extend_from_slice(&buffer[..n]);
}
}
assert_eq!(result, bytes);
let mut result = Vec::new();
let mut object = bucket.get("FOO").unwrap();
loop {
let mut buffer = [0; 128];
if let Ok(n) = object.read(&mut buffer) {
if n == 0 {
break;
}
result.extend_from_slice(&buffer[..n]);
}
}
assert_eq!(result, bytes);
}
#[test]
fn object_sealed() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = nats::connect(server.client_url()).unwrap();
let context = nats::jetstream::new(client);
let bucket = context
.create_object_store(&nats::object_store::Config {
bucket: "OBJECTS".to_string(),
..Default::default()
})
.unwrap();
bucket.seal().unwrap();
let stream_info = context.stream_info("OBJ_OBJECTS").unwrap();
assert!(stream_info.config.sealed);
}
#[test]
fn object_delete() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = nats::connect(server.client_url()).unwrap();
let context = nats::jetstream::new(client);
let bucket = context
.create_object_store(&nats::object_store::Config {
bucket: "OBJECTS".to_string(),
..Default::default()
})
.unwrap();
let data = b"abc".repeat(100).to_vec();
bucket.put("A", &mut data.as_slice()).unwrap();
bucket.delete("A").unwrap();
let stream_info = context.stream_info("OBJ_OBJECTS").unwrap();
assert_eq!(stream_info.state.messages, 1);
let object_info = bucket.info("A").unwrap();
assert!(object_info.deleted);
}
#[test]
fn object_multiple_delete() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = nats::connect(server.client_url()).unwrap();
let context = nats::jetstream::new(client);
let bucket = context
.create_object_store(&nats::object_store::Config {
bucket: "2OD".to_string(),
..Default::default()
})
.unwrap();
let data = b"A".repeat(100).to_vec();
bucket.put("A", &mut data.as_slice()).unwrap();
let old_stream_info = context.stream_info("OBJ_2OD").unwrap();
let data = b"B".repeat(200).to_vec();
bucket.put("B", &mut data.as_slice()).unwrap();
bucket.delete("B").unwrap();
let new_stream_info = context.stream_info("OBJ_2OD").unwrap();
assert_eq!(
new_stream_info.state.messages,
old_stream_info.state.messages + 1
);
}
#[test]
fn object_names() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = nats::connect(server.client_url()).unwrap();
let context = nats::jetstream::new(client);
let bucket = context
.create_object_store(&nats::object_store::Config {
bucket: "NAMES".to_string(),
..Default::default()
})
.unwrap();
let empty = Vec::new();
bucket.put("foo.bar", &mut empty.as_slice()).unwrap();
bucket.put("foo bar", &mut empty.as_slice()).unwrap();
bucket.put("*", &mut empty.as_slice()).unwrap();
bucket.put(">", &mut empty.as_slice()).unwrap();
bucket.put("", &mut empty.as_slice()).unwrap_err();
}
#[test]
fn object_info_modified() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = nats::connect(server.client_url()).unwrap();
let context = nats::jetstream::new(client);
let bucket = context
.create_object_store(&nats::object_store::Config {
bucket: "NAMES".to_string(),
..Default::default()
})
.unwrap();
let empty = Vec::new();
bucket.put("foo.bar", &mut empty.as_slice()).unwrap();
let info = bucket.info("foo.bar").unwrap();
assert!(info.modified.unix_timestamp_nanos() > 0);
}
#[test]
fn object_watch() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = nats::connect(server.client_url()).unwrap();
let context = nats::jetstream::new(client);
let bucket = context
.create_object_store(&nats::object_store::Config {
bucket: "WATCH".to_string(),
..Default::default()
})
.unwrap();
let mut watch = bucket.watch().unwrap();
let bytes = vec![];
bucket.put("foo", &mut bytes.as_slice()).unwrap();
let info = watch.next().unwrap();
assert_eq!(info.name, "foo");
assert_eq!(info.size, bytes.len());
let bytes = vec![1, 2, 3, 4, 5];
bucket.put("bar", &mut bytes.as_slice()).unwrap();
let info = watch.next().unwrap();
assert_eq!(info.name, "bar");
assert_eq!(info.size, bytes.len());
}
#[test]
fn object_digest() {
use std::io::Read;
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = nats::connect(server.client_url()).unwrap();
let jetstream = nats::jetstream::new(client);
let bucket = jetstream
.create_object_store(&nats::object_store::Config {
bucket: "bucket".to_string(),
..Default::default()
})
.unwrap();
let cases = vec![
(
"tests/configs/digests/digester_test_bytes_000100.txt",
"IdgP4UYMGt47rgecOqFoLrd24AXukHf5-SVzqQ5Psg8=",
),
(
"tests/configs/digests/digester_test_bytes_001000.txt",
"DZj4RnBpuEukzFIY0ueZ-xjnHY4Rt9XWn4Dh8nkNfnI=",
),
(
"tests/configs/digests/digester_test_bytes_010000.txt",
"RgaJ-VSJtjNvgXcujCKIvaheiX_6GRCcfdRYnAcVy38=",
),
(
"tests/configs/digests/digester_test_bytes_100000.txt",
"yan7pwBVnC1yORqqgBfd64_qAw6q9fNA60_KRiMMooE=",
),
];
for (filename, digest) in cases {
let file = std::fs::read(filename).unwrap();
bucket.put(filename, &mut file.as_slice()).unwrap();
let mut object = bucket.get(filename).unwrap();
assert_eq!(object.info.digest, format!("SHA-256={digest}"));
let mut result = Vec::new();
object.read_to_end(&mut result).unwrap();
assert_eq!(result, file);
}
}