1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//! Command Line Interface for an archiver
use aws_sdk_s3::{Client, Config, Credentials, Endpoint, Region};
use clap::Parser;
/// CLI for S3 archiver
#[derive(Parser)]
#[command(author, about, long_about = None)]
pub struct Cli {
/// Sets a s3 access key (MinIO username)
#[arg(short, long, value_name = "S3_ACCESS_KEY")]
access_key: String,
/// Sets the s3 secret key (MinIO password)
#[arg(short, long, value_name = "S3_SECRET_KEY")]
secret_key: String,
/// Sets the s3 endpoint to connect to
/// The protocol in the URL doesn't have to be s3://
/// To connect from outside docker-compose to the local s3 endpoint, use http://localhost:9000
/// TODO: how to do this when archiver is deployed inside docker-compose or k8s
#[arg(short, long, value_name = "S3_ENDPOINT")]
endpoint: String,
/// Sets the s3 region to connect to
#[arg(short, long, value_name = "S3_REGION")]
region: String,
/// Sets the s3 bucket name to archive to
/// Note: This should just be of the form "opensensor-archive" or any other valid s3 bucket name
#[arg(short, long, value_name = "S3_BUCKET_NAME")]
bucket_name: String,
/// Sensor name to archive data from
/// Several pieces of information are derived from this:
/// Redpanda topic name = sensor_name + "-measurements"
/// Consumer group name = sensor_name + "-archiver"
#[arg(long, value_name = "SENSOR_NAME")]
sensor_name: String,
/// How many messages to include per archive chunk
#[arg(short, long, value_name = "MESSAGES_PER_CHUNK")]
chunk_size: u64,
/// Addresses of the brokers to connect to, in kafka form
/// ex. 127.0.0.1:9010,127.0.0.1:9011,127.0.0.1:9012
#[arg(short, long, value_name = "KAFKA_ADDRESSES")]
kafka_addresses: String,
}
impl Cli {
/// Construct a new Cli for mocking + testing
pub fn new(
access_key: &str,
secret_key: &str,
endpoint: &str,
region: &str,
bucket_name: &str,
sensor_name: &str,
chunk_side: u64,
kafka_addresses: &str,
) -> Self {
Cli {
access_key: access_key.to_owned(),
secret_key: secret_key.to_owned(),
endpoint: endpoint.to_owned(),
region: region.to_owned(),
bucket_name: bucket_name.to_owned(),
sensor_name: sensor_name.to_owned(),
chunk_size: chunk_side,
kafka_addresses: kafka_addresses.to_owned(),
}
}
/// S3 access key accessor
pub fn access_key(&self) -> &str {
&self.access_key
}
/// S3 secret key accessor
pub fn secret_key(&self) -> &str {
&self.secret_key
}
/// S3 endpoint accessor
pub fn endpoint(&self) -> &str {
&self.endpoint
}
/// S3 region accessor
pub fn region(&self) -> &str {
&self.region
}
/// S3 bucket name accessor
pub fn bucket_name(&self) -> &str {
&self.bucket_name
}
/// sensor name accessor
pub fn sensor_name(&self) -> &str {
&self.sensor_name
}
/// Max number of records to put in a single archival chunk
pub fn chunk_size(&self) -> u64 {
self.chunk_size
}
/// Kafka addresses the archiver consumes from
pub fn kafka_addresses(&self) -> &str {
&self.kafka_addresses
}
/// Build a S3 client from the CLI configuration
pub fn build_client(&self) -> Client {
// credential provider name is required, but the value doesn't seem to matter
let provider_name = "opensensor-credentials";
let creds = Credentials::new(
&self.access_key,
&self.secret_key,
None,
None,
provider_name,
);
let s3_endpoint = Endpoint::immutable(self.endpoint.parse().unwrap());
let config = Config::builder()
.region(Region::new(self.region.clone()))
.endpoint_resolver(s3_endpoint)
.credentials_provider(creds)
.build();
Client::from_conf(config)
}
}