Skip to main content

fluss/rpc/message/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::rpc::api_key::ApiKey;
19use crate::rpc::api_version::ApiVersion;
20use crate::rpc::frame::{ReadError, WriteError};
21use bytes::{Buf, BufMut};
22
23mod authenticate;
24mod create_database;
25mod create_partition;
26mod create_table;
27mod database_exists;
28mod drop_database;
29mod drop_partition;
30mod drop_table;
31mod fetch;
32mod get_database_info;
33mod get_latest_lake_snapshot;
34mod get_security_token;
35mod get_table;
36mod header;
37mod init_writer;
38mod limit_scan;
39mod list_databases;
40mod list_offsets;
41mod list_partition_infos;
42mod list_tables;
43mod lookup;
44mod produce_log;
45mod put_kv;
46mod table_exists;
47mod update_metadata;
48
49pub use crate::rpc::RpcError;
50pub use authenticate::*;
51pub use create_database::*;
52pub use create_partition::*;
53pub use create_table::*;
54pub use database_exists::*;
55pub use drop_database::*;
56pub use drop_partition::*;
57pub use drop_table::*;
58pub use fetch::*;
59pub use get_database_info::*;
60pub use get_latest_lake_snapshot::*;
61pub use get_security_token::*;
62pub use get_table::*;
63pub use header::*;
64pub use init_writer::*;
65pub use limit_scan::*;
66pub use list_databases::*;
67pub use list_offsets::*;
68pub use list_partition_infos::*;
69pub use list_tables::*;
70pub use lookup::*;
71pub use produce_log::*;
72pub use put_kv::*;
73pub use table_exists::*;
74pub use update_metadata::*;
75
76pub trait RequestBody {
77    type ResponseBody;
78
79    const API_KEY: ApiKey;
80
81    const REQUEST_VERSION: ApiVersion;
82}
83
84impl<T: RequestBody> RequestBody for &T {
85    type ResponseBody = T::ResponseBody;
86
87    const API_KEY: ApiKey = T::API_KEY;
88
89    const REQUEST_VERSION: ApiVersion = T::REQUEST_VERSION;
90}
91
92pub trait WriteVersionedType<W>: Sized
93where
94    W: BufMut,
95{
96    fn write_versioned(&self, writer: &mut W, version: ApiVersion) -> Result<(), WriteError>;
97}
98
99pub trait ReadVersionedType<R>: Sized
100where
101    R: Buf,
102{
103    fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadError>;
104}
105
106#[macro_export]
107macro_rules! impl_write_version_type {
108    ($type:ty) => {
109        impl<W> WriteVersionedType<W> for $type
110        where
111            W: BufMut,
112        {
113            fn write_versioned(
114                &self,
115                writer: &mut W,
116                _version: ApiVersion,
117            ) -> Result<(), WriteError> {
118                Ok(self.inner_request.encode(writer).unwrap())
119            }
120        }
121    };
122}
123
124#[macro_export]
125macro_rules! impl_read_version_type {
126    ($type:ty) => {
127        impl<R> ReadVersionedType<R> for $type
128        where
129            R: Buf,
130        {
131            fn read_versioned(reader: &mut R, _version: ApiVersion) -> Result<Self, ReadError> {
132                Ok(<$type>::decode(reader).unwrap())
133            }
134        }
135    };
136}