1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use serde::{Deserialize, Serialize};
/// Compression encoding options
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompressionEncoding {
/// Gzip compression
Gzip,
/// Zstd compression
Zstd,
}
#[derive(Debug, Clone)]
pub struct LaserstreamConfig {
/// API Key for authentication.
pub api_key: String,
/// The Laserstream endpoint URL.
pub endpoint: String,
/// Maximum number of reconnection attempts. Defaults to 10.
/// A hard cap of 240 attempts (20 minutes / 5 seconds) is enforced internally.
pub max_reconnect_attempts: Option<u32>,
/// gRPC channel options
pub channel_options: ChannelOptions,
/// When true, enable replay on reconnects (uses from_slot and internal slot tracking).
/// When false, no replay - start from current slot on reconnects.
/// Default: true
pub replay: bool,
}
#[derive(Debug, Clone)]
pub struct ChannelOptions {
/// Connect timeout in seconds. Default: 10
pub connect_timeout_secs: Option<u64>,
/// Request timeout in seconds. Default: 30
pub timeout_secs: Option<u64>,
/// Max message size for receiving in bytes. Default: 1GB
pub max_decoding_message_size: Option<usize>,
/// Max message size for sending in bytes. Default: 32MB
pub max_encoding_message_size: Option<usize>,
/// HTTP/2 keep-alive interval in seconds. Default: 30
pub http2_keep_alive_interval_secs: Option<u64>,
/// Keep-alive timeout in seconds. Default: 5
pub keep_alive_timeout_secs: Option<u64>,
/// Enable keep-alive while idle. Default: true
pub keep_alive_while_idle: Option<bool>,
/// Initial stream window size in bytes. Default: 4MB
pub initial_stream_window_size: Option<u32>,
/// Initial connection window size in bytes. Default: 8MB
pub initial_connection_window_size: Option<u32>,
/// Enable HTTP/2 adaptive window. Default: true
pub http2_adaptive_window: Option<bool>,
/// Enable TCP no-delay. Default: true
pub tcp_nodelay: Option<bool>,
/// TCP keep-alive interval in seconds. Default: 60
pub tcp_keepalive_secs: Option<u64>,
/// Buffer size in bytes. Default: 64KB
pub buffer_size: Option<usize>,
/// Compression encodings to accept from server. Default: ["gzip", "zstd"]
pub accept_compression: Option<Vec<CompressionEncoding>>,
/// Compression encoding to use when sending. Default: None
pub send_compression: Option<CompressionEncoding>,
}
impl Default for ChannelOptions {
fn default() -> Self {
Self {
connect_timeout_secs: None,
timeout_secs: None,
max_decoding_message_size: None,
max_encoding_message_size: None,
http2_keep_alive_interval_secs: None,
keep_alive_timeout_secs: None,
keep_alive_while_idle: None,
initial_stream_window_size: None,
initial_connection_window_size: None,
http2_adaptive_window: None,
tcp_nodelay: None,
tcp_keepalive_secs: None,
buffer_size: None,
accept_compression: None,
send_compression: None,
}
}
}
impl ChannelOptions {
/// Enable zstd compression for both sending and receiving
pub fn with_zstd_compression(mut self) -> Self {
self.send_compression = Some(CompressionEncoding::Zstd);
self.accept_compression = Some(vec![CompressionEncoding::Zstd, CompressionEncoding::Gzip]);
self
}
/// Enable gzip compression for both sending and receiving
pub fn with_gzip_compression(mut self) -> Self {
self.send_compression = Some(CompressionEncoding::Gzip);
self.accept_compression = Some(vec![CompressionEncoding::Gzip, CompressionEncoding::Zstd]);
self
}
}
impl Default for LaserstreamConfig {
fn default() -> Self {
Self {
api_key: String::new(),
endpoint: String::new(),
max_reconnect_attempts: None, // Default to None
channel_options: ChannelOptions::default(),
replay: true, // Default to true
}
}
}
impl LaserstreamConfig {
pub fn new(endpoint: String, api_key: String) -> Self {
Self {
endpoint,
api_key,
max_reconnect_attempts: None, // Default to None
channel_options: ChannelOptions::default(),
replay: true, // Default to true
}
}
/// Sets the maximum number of reconnection attempts.
pub fn with_max_reconnect_attempts(mut self, attempts: u32) -> Self {
self.max_reconnect_attempts = Some(attempts);
self
}
/// Sets custom channel options.
pub fn with_channel_options(mut self, options: ChannelOptions) -> Self {
self.channel_options = options;
self
}
/// Sets replay behavior on reconnects.
/// When true (default), uses from_slot and internal slot tracking for replay.
/// When false, starts from current slot on reconnects (no replay).
pub fn with_replay(mut self, replay: bool) -> Self {
self.replay = replay;
self
}
}