greptime-proto 0.1.0

Rust protobuf bindings for GreptimeDB APIs.
Documentation
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

include!("../generated/greptime.v1.meta.rs");

mod mailbox;

use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::hash::Hash;

pub const PROTOCOL_VERSION: u64 = 1;

#[derive(Default)]
pub struct PeerDict {
    peers: HashMap<Peer, usize>,
    index: usize,
}

impl PeerDict {
    pub fn get_or_insert(&mut self, peer: Peer) -> usize {
        let index = self.peers.entry(peer).or_insert_with(|| {
            let v = self.index;
            self.index += 1;
            v
        });

        *index
    }

    pub fn into_peers(self) -> Vec<Peer> {
        let mut array = vec![Peer::default(); self.index];
        for (p, i) in self.peers {
            array[i] = p;
        }
        array
    }
}

impl Display for Peer {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "peer-{}({})", self.id, self.addr)
    }
}

impl Peer {
    pub fn new(id: u64, addr: impl Into<String>) -> Self {
        Self {
            id,
            addr: addr.into(),
        }
    }
    pub fn empty(id: u64) -> Self {
        Self {
            id,
            addr: String::new(),
        }
    }
}

impl RequestHeader {
    #[inline]
    pub fn new(member_id: u64, role: Role, tracing_context: HashMap<String, String>) -> Self {
        Self {
            protocol_version: PROTOCOL_VERSION,
            member_id,
            role: role.into(),
            tracing_context,
        }
    }
}

impl ResponseHeader {
    #[inline]
    pub fn success() -> Self {
        Self {
            protocol_version: PROTOCOL_VERSION,
            ..Default::default()
        }
    }

    #[inline]
    pub fn failed(error: Error) -> Self {
        Self {
            protocol_version: PROTOCOL_VERSION,
            error: Some(error),
        }
    }

    #[inline]
    pub fn is_not_leader(&self) -> bool {
        if let Some(error) = &self.error {
            if error.code == ErrorCode::NotLeader as i32 {
                return true;
            }
        }
        false
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ErrorCode {
    NotEnoughAvailableDatanodes = 1,
    NotLeader = 2,
}

impl Error {
    #[inline]
    pub fn not_enough_available_datanodes(expected: usize, actual: usize) -> Self {
        Self {
            code: ErrorCode::NotEnoughAvailableDatanodes as i32,
            err_msg: format!(
                "There are not enough active datanodes, expected: {expected}, actual: {actual}."
            ),
        }
    }

    #[inline]
    pub fn is_not_leader() -> Self {
        Self {
            code: ErrorCode::NotLeader as i32,
            err_msg: "Current server is not leader".to_string(),
        }
    }
}

impl HeartbeatResponse {
    #[inline]
    pub fn is_not_leader(&self) -> bool {
        if let Some(header) = &self.header {
            return header.is_not_leader();
        }
        false
    }
}
macro_rules! gen_set_header {
    ($req: ty) => {
        impl $req {
            #[inline]
            pub fn set_header(
                &mut self,
                member_id: u64,
                role: Role,
                tracing_context: HashMap<String, String>,
            ) {
                match self.header.as_mut() {
                    Some(header) => {
                        header.member_id = member_id;
                        header.role = role.into();
                        header.tracing_context = tracing_context;
                    }
                    None => {
                        self.header = Some(RequestHeader::new(member_id, role, tracing_context));
                    }
                }
            }
        }
    };
}

gen_set_header!(HeartbeatRequest);
gen_set_header!(RangeRequest);
gen_set_header!(PutRequest);
gen_set_header!(BatchGetRequest);
gen_set_header!(BatchPutRequest);
gen_set_header!(BatchDeleteRequest);
gen_set_header!(CompareAndPutRequest);
gen_set_header!(DeleteRangeRequest);
gen_set_header!(LockRequest);
gen_set_header!(UnlockRequest);
gen_set_header!(DdlTaskRequest);
gen_set_header!(MigrateRegionRequest);
gen_set_header!(QueryProcedureRequest);
gen_set_header!(ProcedureDetailRequest);
gen_set_header!(ReconcileRequest);

#[cfg(test)]
mod tests {
    use std::vec;

    use super::*;

    #[test]
    fn test_peer_dict() {
        let mut dict = PeerDict::default();

        dict.get_or_insert(Peer {
            id: 1,
            addr: "111".to_string(),
        });
        dict.get_or_insert(Peer {
            id: 2,
            addr: "222".to_string(),
        });
        dict.get_or_insert(Peer {
            id: 1,
            addr: "111".to_string(),
        });
        dict.get_or_insert(Peer {
            id: 1,
            addr: "111".to_string(),
        });
        dict.get_or_insert(Peer {
            id: 1,
            addr: "111".to_string(),
        });
        dict.get_or_insert(Peer {
            id: 1,
            addr: "111".to_string(),
        });
        dict.get_or_insert(Peer {
            id: 2,
            addr: "222".to_string(),
        });

        assert_eq!(2, dict.index);
        assert_eq!(
            vec![
                Peer {
                    id: 1,
                    addr: "111".to_string(),
                },
                Peer {
                    id: 2,
                    addr: "222".to_string(),
                }
            ],
            dict.into_peers()
        );
    }
}