1use async_trait::async_trait;
2use futures::sink::{Sink, SinkExt};
3use std::fmt::Debug;
4
5use crate::error::{ErrorInfo, PgWireError, PgWireResult};
6use crate::messages::copy::{
7 CopyBothResponse, CopyData, CopyDone, CopyFail, CopyInResponse, CopyOutResponse,
8};
9use crate::messages::PgWireBackendMessage;
10
11use super::results::CopyResponse;
12use super::ClientInfo;
13
14#[async_trait]
16pub trait CopyHandler: Send + Sync {
17 async fn on_copy_data<C>(&self, _client: &mut C, _copy_data: CopyData) -> PgWireResult<()>
18 where
19 C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
20 C::Error: Debug,
21 PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>;
22
23 async fn on_copy_done<C>(&self, _client: &mut C, _done: CopyDone) -> PgWireResult<()>
24 where
25 C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
26 C::Error: Debug,
27 PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>;
28
29 async fn on_copy_fail<C>(&self, _client: &mut C, fail: CopyFail) -> PgWireError
30 where
31 C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
32 C::Error: Debug,
33 PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
34 {
35 PgWireError::UserError(Box::new(ErrorInfo::new(
36 "ERROR".to_owned(),
37 "XX000".to_owned(),
38 format!("COPY IN mode terminated by the user: {}", fail.message),
39 )))
40 }
41}
42
43pub async fn send_copy_in_response<C>(client: &mut C, resp: CopyResponse) -> PgWireResult<()>
44where
45 C: Sink<PgWireBackendMessage> + Unpin,
46 C::Error: Debug,
47 PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
48{
49 let resp = CopyInResponse::new(resp.format, resp.columns as i16, resp.column_formats);
50 client
51 .send(PgWireBackendMessage::CopyInResponse(resp))
52 .await?;
53 Ok(())
54}
55
56pub async fn send_copy_out_response<C>(client: &mut C, resp: CopyResponse) -> PgWireResult<()>
57where
58 C: Sink<PgWireBackendMessage> + Unpin,
59 C::Error: Debug,
60 PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
61{
62 let resp = CopyOutResponse::new(resp.format, resp.columns as i16, resp.column_formats);
63 client
64 .send(PgWireBackendMessage::CopyOutResponse(resp))
65 .await?;
66 Ok(())
67}
68
69pub async fn send_copy_both_response<C>(client: &mut C, resp: CopyResponse) -> PgWireResult<()>
70where
71 C: Sink<PgWireBackendMessage> + Unpin,
72 C::Error: Debug,
73 PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
74{
75 let resp = CopyBothResponse::new(resp.format, resp.columns as i16, resp.column_formats);
76 client
77 .send(PgWireBackendMessage::CopyBothResponse(resp))
78 .await?;
79 Ok(())
80}
81
82#[async_trait]
83impl CopyHandler for super::NoopHandler {
84 async fn on_copy_data<C>(&self, _client: &mut C, _copy_data: CopyData) -> PgWireResult<()>
85 where
86 C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
87 C::Error: Debug,
88 PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
89 {
90 Err(PgWireError::UserError(Box::new(ErrorInfo::new(
91 "FATAL".to_owned(),
92 "08P01".to_owned(),
93 "This feature is not implemented.".to_string(),
94 ))))
95 }
96
97 async fn on_copy_done<C>(&self, _client: &mut C, _done: CopyDone) -> PgWireResult<()>
98 where
99 C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
100 C::Error: Debug,
101 PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
102 {
103 Err(PgWireError::UserError(Box::new(ErrorInfo::new(
104 "FATAL".to_owned(),
105 "08P01".to_owned(),
106 "This feature is not implemented.".to_string(),
107 ))))
108 }
109}