use std::{
collections::{BTreeSet, HashMap},
time::{SystemTime, UNIX_EPOCH},
};
use aws_manager::{self, sqs};
use aws_sdk_sqs::types::MessageAttributeValue;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
env_logger::init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);
let shared_config = aws_manager::load_config(Some(String::from("us-west-2")), None, None).await;
log::info!("region {:?}", shared_config.region().unwrap());
let sqs_manager = sqs::Manager::new(&shared_config);
let queue_name = format!("{}.fifo", random_manager::secure_string(10));
let queue_url = sqs_manager.create_fifo(&queue_name, 30, 1).await.unwrap();
sleep(Duration::from_secs(10)).await;
let msg_group_id = random_manager::secure_string(32);
for _ in 0..3 {
sleep(Duration::from_secs(1)).await;
let msg_body = random_manager::secure_string(100);
let msg_dedup_id = random_manager::secure_string(32);
let unix_ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let mut msg_attributes = HashMap::new();
msg_attributes.insert(
"sent-unix-second".to_string(),
MessageAttributeValue::builder()
.string_value(format!("{}", unix_ts.as_secs()))
.data_type("String")
.build(),
);
msg_attributes.insert(
random_manager::secure_string(10),
MessageAttributeValue::builder()
.string_value(random_manager::secure_string(10))
.data_type("String")
.build(),
);
let _ = sqs_manager
.send_msg_to_fifo(
&queue_url,
&msg_group_id,
Some(msg_dedup_id),
Some(msg_attributes),
&msg_body,
)
.await
.unwrap();
}
println!();
println!();
sleep(Duration::from_secs(5)).await;
let attr = sqs_manager.get_attributes(&queue_url).await.unwrap();
for (k, v) in attr.iter() {
log::info!("attribute '{:?}' = '{}'", k, v);
}
println!();
println!();
sleep(Duration::from_secs(5)).await;
let mut receipt_handles = Vec::new();
let mut msg_attribute_names = BTreeSet::new();
msg_attribute_names.insert(".*".to_string());
let msgs = sqs_manager
.recv_msgs(&queue_url, 10, 3, Some(msg_attribute_names))
.await
.unwrap();
for msg in &msgs {
log::info!("received message: {:?}", msg);
receipt_handles.push(msg.receipt_handle().unwrap().to_string());
}
println!();
println!();
sleep(Duration::from_secs(5)).await;
let attr = sqs_manager.get_attributes(&queue_url).await.unwrap();
for (k, v) in attr.iter() {
log::info!("attribute '{:?}' = '{}'", k, v);
}
println!();
println!();
sleep(Duration::from_secs(5)).await;
for receipt_handle in &receipt_handles {
sqs_manager
.delete_msg(&queue_url, &receipt_handle)
.await
.unwrap();
sqs_manager
.delete_msg(&queue_url, &receipt_handle)
.await
.unwrap();
}
println!();
println!();
sleep(Duration::from_secs(5)).await;
let attr = sqs_manager.get_attributes(&queue_url).await.unwrap();
for (k, v) in attr.iter() {
log::info!("queue attribute '{:?}' = '{}'", k, v);
}
println!();
println!();
sleep(Duration::from_secs(5)).await;
sqs_manager.delete(&queue_url).await.unwrap();
println!();
println!();
sleep(Duration::from_secs(5)).await;
sqs_manager.delete(&queue_url).await.unwrap();
}