1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use kafkang::client::{FetchPartition, KafkaClient};
/// This program demonstrates the low level api for fetching messages.
/// Please look at examles/consume.rs for an easier to use API.
fn main() {
tracing_subscriber::fmt::init();
let broker = "localhost:9092";
let topic = "my-topic";
let partition = 0;
let offset = 0;
println!(
"About to fetch messages at {} from: {} (partition {}, offset {}) ",
broker, topic, partition, offset
);
let mut client = KafkaClient::new(vec![broker.to_owned()]);
if let Err(e) = client.load_metadata_all() {
println!("Failed to load metadata from {}: {}", broker, e);
return;
}
// ~ make sure to print out a warning message when the target
// topic does not yet exist
if !client.topics().contains(topic) {
println!("No such topic at {}: {}", broker, topic);
return;
}
match client.fetch_messages(&[FetchPartition::new(topic, partition, offset)]) {
Err(e) => {
println!("Failed to fetch messages: {}", e);
}
Ok(resps) => {
for resp in resps {
for t in resp.topics() {
for p in t.partitions() {
match p.data() {
Err(ref e) => {
println!("partition error: {}:{}: {}", t.topic(), p.partition(), e)
}
Ok(data) => {
println!(
"topic: {} / partition: {} / latest available message \
offset: {}",
t.topic(),
p.partition(),
data.highwatermark_offset()
);
for msg in data.messages() {
println!(
"topic: {} / partition: {} / message.offset: {} / \
message.len: {}",
t.topic(),
p.partition(),
msg.offset,
msg.value.len()
);
}
}
}
}
}
}
}
}
}