1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
//! Archive a Kafka topic to S3-compatible object storage
//!
//! Archiver is configured via command line arguments that allow users to specify the following:
//! - access-key: MINIO_ROOT_USER or AWS_ACCESS_KEY_ID as plaintext
//! - secret-key: MINIO_ROOT_PASSWORD or AWS_SECRET_ACCESS_KEY as plaintext
//! - endpoint: Address to connect to the s3-compatible storage at. This will be different depending on whether you're connecting
//! from outside the k8s cluster or docker compose environment or inside. This doesn't seem to require specifying that
//! the connection protocol should be s3...http://localhost:9000 seems to work just fine for MinIO running inside
//! docker compose with port 9000 exposed.
//! - region: MINIO_REGION_NAME or AWS_DEFAULT_REGION. The s3 region to connect to.
//! - bucket-name: Bucket to save sensor archive data to.
//! - sensor-name: Name of the sensor to archive data from. This name is used to generate the Kafka topic name to subscribe to
//! ("{sensor-name}-measurements"), the Kafka group_id associated with the consumer ("{sensor-name}-archiver") and the tag to prepend all object names with ()
//! - chunk-size: How many sensor measurements to include in a single archive file. As long as the resulting file is <2gb
//! (the max size of a flatbuffer) and you have adequate system memory to store the flatbuffers before
//! they're constructed and written to s3. In practice, this should probably be in the low hundreds of mb, but depends
//! on the data production rate of the sensor.
//! - kafka-addresses: Hostname and ports, in Kafka form, of the brokers to connect to.
//!
//! Data is archived as a vector of chunk-size flatbuffer records, zstd compressed per archival file. To parse, un-compress
//! and use the readers provided in the messages crate. Readers can be generated for any of the programming languages
//! supported by flatbuffers. Last archived offsets are saved automatically in the consumer group topic offsets.
//!
//! # Example
//!
//! ```
//! cargo run --bin archiver -- --access-key user \
//! --secret-key user123456 \
//! --endpoint http://localhost:9000 \
//! --region opensensor-region \
//! --bucket-name \
//! --sensor-name radar-2d \
//! --chunk-size 10000 \
//! --kafka-addresses 127.0.0.1:9010,127.0.0.1:9011,127.0.0.1:9012
//! ```
// use archiver::cli::Cli;
// use archiver::error::ArchiveError;
// use chrono::Utc;
// use clap::Parser;
// use flatbuffers::FlatBufferBuilder;
// use futures_util::StreamExt;
// use messages::radar_2d::{
// root_as_radar_measurement_2d, RadarMeasurement2d, RadarMeasurement2dArgs,
// RadarMeasurement2dFlatBufferBuilder, RadarVector2DBuilder,
// };
// use redpanda::{consumer::CommitMode, consumer::Consumer, message::Message, RedpandaBuilder};
// use tracing::{event, Level};
// #[tokio::main]
// async fn main() -> Result<(), ArchiveError> {
// utility::configure_tracing();
// let cli = Cli::parse();
// run_archiver(cli).await?;
// Ok(())
// }
// /// Run a kafka archiver, given a parsed command line configuration
// ///
// /// # Parameters
// ///
// /// - cli (archiver.cli.Cli): CLI configuration to run the archiver from
// ///
// /// # Examples
// ///
// /// ```no_run
// /// let access_key = "USERNAME";
// /// let secret_key = "SUPER_SECRET_PASSWORD";
// /// let endpoint = "http://localhost:9000";
// /// let region = "opensensor-region";
// /// let bucket_name = "opensensor-archive";
// /// let sensor_name = "radar-2d";
// /// let chunk_size = 10000;
// /// let kafka_addresses = "127.0.0.1:9010,127.0.0.1:9011,127.0.0.1:9012";
// ///
// /// let cli = Cli::new(
// /// access_key,
// /// secret_key,
// /// endpoint,
// /// region,
// /// bucket_name,
// /// sensor_name,
// /// chunk_size,
// /// kafka_addresses,
// /// );
// ///
// /// run_archiver(cli).await?;
// /// ```
// ///
// /// TODO: Verify the ordering of these archived chunks is correct (does the archived data end up revered because of the push
// /// and then pop?)
// /// TODO: implement individual archiver for each message type because that's required to do the serialization and deserialization
// async fn run_archiver(cli: Cli) -> Result<(), ArchiveError> {
// let client = cli.build_client();
// // Configure Redpanda, disabling auto-commit to ensure we only commit topics consumption offsets
// // for the "sensor_name-archiver" topics once the consumed records have been successfully
// // written to S3
// let mut builder = RedpandaBuilder::default();
// let group_id = format!("{}-archiver", cli.sensor_name());
// builder.set_group_id(&group_id);
// builder.set("enable.auto.commit", "false");
// builder.set_bootstrap_servers(cli.kafka_addresses());
// let topic = format!("{}-measurements", cli.sensor_name());
// let consumer = builder.build_consumer().unwrap();
// consumer.subscribe(&[&topic]).unwrap();
// let mut stream = consumer.stream();
// event!(Level::INFO, "{:?}", consumer.consumer.position().unwrap());
// // locals for archive chunk tracking
// let chunk_size = cli.chunk_size();
// let mut chunk_counter = 0;
// // Vector to save archive chunks to...hard limit of 2GB per buffer due to 32 bit flatbuffer address space.
// // The practical limit is somewhat less than this...current implementation relies on there being enough
// // RAM to store all in-progress archive chunks in memory.
// // The allocation for this seems unavoidable...there is no other way to save the intermediate references
// // to flatbuffer buffer data
// let mut archival_buffer: Vec<RadarMeasurement2dFlatBufferBuilder> = Vec::new();
// // Stream the topic, writing archives to S3 every chunk_size messages
// while let Some(m) = stream.next().await {
// chunk_counter += 1;
// let bytes = m.as_ref().unwrap().payload();
// // If there's no payload, continue to the next message
// if bytes.is_none() {
// event!(
// Level::WARN,
// "Got empty Redpanda message payload, continuing to next message"
// );
// continue;
// }
// // We've already handled the Option::None case above, so this won't panic
// let bytes = bytes.unwrap();
// // Try to map the bytes to a radar_measurement_2d
// let measurement = root_as_radar_measurement_2d(bytes).unwrap();
// let radar_builder = RadarMeasurement2dFlatBufferBuilder::new(
// measurement.measurement_strengths().unwrap().bytes(),
// measurement.theta_radians(),
// )
// .unwrap();
// archival_buffer.push(radar_builder);
// if chunk_counter == chunk_size {
// // Create a FlatBufferBuilder to construct the RadarVector2d from
// let mut fbb = FlatBufferBuilder::new();
// let mut offsets = Vec::new();
// while let Some(m) = archival_buffer.pop() {
// let measurement_strengths_offset = fbb.create_vector(m.measurement_strengths());
// let offset = RadarMeasurement2d::create(
// &mut fbb,
// &RadarMeasurement2dArgs {
// theta_radians: *m.theta_radians(),
// measurement_strengths: Some(measurement_strengths_offset),
// },
// );
// offsets.push(offset);
// }
// let data_offsets = fbb.create_vector(&offsets);
// let mut radar_vector_builder = RadarVector2DBuilder::new(&mut fbb);
// radar_vector_builder.add_data(data_offsets);
// let radar_vector_buffer = radar_vector_builder.finish();
// fbb.finish_minimal(radar_vector_buffer);
// let now = Utc::now();
// let key = format!("{}/{}", cli.sensor_name(), now.to_rfc3339());
// let data_uncompressed = fbb.finished_data();
// // Try to upload (and compress) the data to s3. Return errors on upload failure or on offset commit failure
// match archiver::upload_object_zstd(data_uncompressed, &client, cli.bucket_name(), &key)
// .await
// {
// Ok(_) => event!(
// Level::DEBUG,
// "Uploaded key {} to bucket {}",
// key,
// cli.bucket_name()
// ),
// Err(e) => return Err(ArchiveError::S3Error(e)),
// };
// if let Err(e) = consumer.consumer.commit_consumer_state(CommitMode::Sync) {
// event!(Level::ERROR, "Failed to commit consumer offset. This may result in duplicate archives in archival storage. {}", e);
// return Err(ArchiveError::KafkaError(e));
// };
// // let timestamp = deserialize_key(m.unwrap().key().unwrap()).unwrap();
// event!(Level::INFO, count = chunk_counter, timestamp = ?now, position = ?consumer.consumer.position().unwrap());
// chunk_counter = 0;
// }
// }
// Ok(())
// }