cmdb_ip_mapping/
ip_mapping_connect.rs

1use rlink::core;
2use rlink::core::element::{Record, FnSchema};
3use rlink::core::function::{CoProcessFunction, Context};
4use rlink_connector_kafka::buffer_gen::kafka_message;
5
6use crate::ip_mapping_config::{update_ip_mapping_by_id, IpMappingItem};
7
8#[derive(Debug, Function)]
9pub struct IpMappingCoProcessFunction {
10    fn_schema: FnSchema
11}
12
13impl IpMappingCoProcessFunction {
14    pub fn new(fn_schema: FnSchema) -> Self {
15        IpMappingCoProcessFunction {
16            fn_schema
17        }
18    }
19}
20
21impl CoProcessFunction for IpMappingCoProcessFunction {
22    fn open(&mut self, _context: &Context) -> core::Result<()> {
23        Ok(())
24    }
25
26    fn process_left(&mut self, record: Record) -> Box<dyn Iterator<Item = Record>> {
27        Box::new(vec![record].into_iter())
28    }
29
30    fn process_right(
31        &mut self,
32        stream_seq: usize,
33        mut record: Record,
34    ) -> Box<dyn Iterator<Item = Record>> {
35        let payload = match kafka_message::Entity::parse(record.as_buffer()) {
36            Ok(entity) => entity.payload,
37            _ => {
38                return Box::new(vec![].into_iter());
39            }
40        };
41        let line = match String::from_utf8(payload.to_vec()) {
42            Ok(line) => line,
43            _ => return Box::new(vec![].into_iter()),
44        };
45        info!(
46            "ip mapping config update:stream_seq={},value={}",
47            stream_seq, line
48        );
49        match update_ip_mapping(line) {
50            Ok(()) => {}
51            Err(e) => error!("update ip mapping error.{}", e),
52        }
53        Box::new(vec![].into_iter())
54    }
55
56    fn close(&mut self) -> core::Result<()> {
57        Ok(())
58    }
59
60    fn schema(&self, _input_schema: FnSchema) -> FnSchema {
61        self.fn_schema.clone()
62    }
63}
64
65fn update_ip_mapping(context: String) -> serde_json::Result<()> {
66    let response: IpMappingChangeResponse = serde_json::from_str(context.as_str())?;
67    let is_del = response.change_type.eq("delete");
68    update_ip_mapping_by_id(response.info, is_del);
69    Ok(())
70}
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73struct IpMappingChangeResponse {
74    #[serde(rename = "changeType")]
75    change_type: String,
76    info: IpMappingItem,
77}