pub struct ReplicationClient { /* private fields */ }Expand description
PostgreSQL logical replication client.
This client spawns a background worker task that maintains the replication connection and streams events to the consumer via a bounded channel.
§Example
use pgwire_replication::client::{ReplicationClient, ReplicationEvent};
use pgwire_replication::config::ReplicationConfig;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ReplicationConfig::new(
"localhost",
"postgres",
"password",
"mydb",
"my_slot",
"my_pub",
);
let mut client = ReplicationClient::connect(config).await?;
while let Some(ev) = client.recv().await? {
match ev {
ReplicationEvent::XLogData { data, wal_end, .. } => {
process_change(&data);
client.update_applied_lsn(wal_end);
}
ReplicationEvent::KeepAlive { .. } => {}
ReplicationEvent::StoppedAt { reached } => {
println!("Reached stop LSN: {reached}");
break;
}
_ => {}
}
}
Ok(())
}
fn process_change(_data: &bytes::Bytes) {
// user-defined
}Implementations§
Source§impl ReplicationClient
impl ReplicationClient
Sourcepub async fn connect(cfg: ReplicationConfig) -> Result<Self>
pub async fn connect(cfg: ReplicationConfig) -> Result<Self>
Connect to PostgreSQL and start streaming replication events.
This establishes a TCP connection (optionally upgrading to TLS),
authenticates, and starts the replication stream. Events are buffered
in a channel of size config.buffer_events.
§Errors
Returns an error if:
- TCP connection fails
- TLS handshake fails (when enabled)
- Authentication fails
- Replication slot doesn’t exist
- Publication doesn’t exist
Sourcepub async fn recv(&mut self) -> Result<Option<ReplicationEvent>>
pub async fn recv(&mut self) -> Result<Option<ReplicationEvent>>
Receive the next replication event.
Ok(Some(event))=> received an eventOk(None)=> replication ended normally (stop requested or stop_at_lsn reached)Err(e)=> replication ended abnormally
Sourcepub fn update_applied_lsn(&self, lsn: Lsn)
pub fn update_applied_lsn(&self, lsn: Lsn)
Update the applied/durable LSN reported to the server.
Semantics: call this only once you have durably persisted all events up to lsn.
This update is monotonic and cheap; wire feedback is still governed by the worker’s
status_interval and keepalive reply requests.
Sourcepub fn stop(&self)
pub fn stop(&self)
Request the worker to stop gracefully.
After calling this, recv() will return remaining buffered
events, then Ok(None) once the worker exits cleanly.
This sends a CopyDone message to the server to cleanly terminate the replication stream.
pub fn is_running(&self) -> bool
Sourcepub async fn join(self) -> Result<()>
pub async fn join(self) -> Result<()>
Wait for the worker task to complete and return its result.
This consumes the client. Use this for diagnostics or to ensure
clean shutdown after calling stop().