pgwire/api/
copy.rs

1use async_trait::async_trait;
2use futures::sink::{Sink, SinkExt};
3use std::fmt::Debug;
4
5use crate::error::{ErrorInfo, PgWireError, PgWireResult};
6use crate::messages::copy::{
7    CopyBothResponse, CopyData, CopyDone, CopyFail, CopyInResponse, CopyOutResponse,
8};
9use crate::messages::PgWireBackendMessage;
10
11use super::results::CopyResponse;
12use super::ClientInfo;
13
14/// handler for copy messages
15#[async_trait]
16pub trait CopyHandler: Send + Sync {
17    async fn on_copy_data<C>(&self, _client: &mut C, _copy_data: CopyData) -> PgWireResult<()>
18    where
19        C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
20        C::Error: Debug,
21        PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>;
22
23    async fn on_copy_done<C>(&self, _client: &mut C, _done: CopyDone) -> PgWireResult<()>
24    where
25        C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
26        C::Error: Debug,
27        PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>;
28
29    async fn on_copy_fail<C>(&self, _client: &mut C, fail: CopyFail) -> PgWireError
30    where
31        C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
32        C::Error: Debug,
33        PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
34    {
35        PgWireError::UserError(Box::new(ErrorInfo::new(
36            "ERROR".to_owned(),
37            "XX000".to_owned(),
38            format!("COPY IN mode terminated by the user: {}", fail.message),
39        )))
40    }
41}
42
43pub async fn send_copy_in_response<C>(client: &mut C, resp: CopyResponse) -> PgWireResult<()>
44where
45    C: Sink<PgWireBackendMessage> + Unpin,
46    C::Error: Debug,
47    PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
48{
49    let resp = CopyInResponse::new(resp.format, resp.columns as i16, resp.column_formats);
50    client
51        .send(PgWireBackendMessage::CopyInResponse(resp))
52        .await?;
53    Ok(())
54}
55
56pub async fn send_copy_out_response<C>(client: &mut C, resp: CopyResponse) -> PgWireResult<()>
57where
58    C: Sink<PgWireBackendMessage> + Unpin,
59    C::Error: Debug,
60    PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
61{
62    let resp = CopyOutResponse::new(resp.format, resp.columns as i16, resp.column_formats);
63    client
64        .send(PgWireBackendMessage::CopyOutResponse(resp))
65        .await?;
66    Ok(())
67}
68
69pub async fn send_copy_both_response<C>(client: &mut C, resp: CopyResponse) -> PgWireResult<()>
70where
71    C: Sink<PgWireBackendMessage> + Unpin,
72    C::Error: Debug,
73    PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
74{
75    let resp = CopyBothResponse::new(resp.format, resp.columns as i16, resp.column_formats);
76    client
77        .send(PgWireBackendMessage::CopyBothResponse(resp))
78        .await?;
79    Ok(())
80}
81
82#[async_trait]
83impl CopyHandler for super::NoopHandler {
84    async fn on_copy_data<C>(&self, _client: &mut C, _copy_data: CopyData) -> PgWireResult<()>
85    where
86        C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
87        C::Error: Debug,
88        PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
89    {
90        Err(PgWireError::UserError(Box::new(ErrorInfo::new(
91            "FATAL".to_owned(),
92            "08P01".to_owned(),
93            "This feature is not implemented.".to_string(),
94        ))))
95    }
96
97    async fn on_copy_done<C>(&self, _client: &mut C, _done: CopyDone) -> PgWireResult<()>
98    where
99        C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
100        C::Error: Debug,
101        PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
102    {
103        Err(PgWireError::UserError(Box::new(ErrorInfo::new(
104            "FATAL".to_owned(),
105            "08P01".to_owned(),
106            "This feature is not implemented.".to_string(),
107        ))))
108    }
109}