cmdb_ip_mapping/
ip_mapping_connect.rs1use rlink::core;
2use rlink::core::element::{Record, FnSchema};
3use rlink::core::function::{CoProcessFunction, Context};
4use rlink_connector_kafka::buffer_gen::kafka_message;
5
6use crate::ip_mapping_config::{update_ip_mapping_by_id, IpMappingItem};
7
8#[derive(Debug, Function)]
9pub struct IpMappingCoProcessFunction {
10 fn_schema: FnSchema
11}
12
13impl IpMappingCoProcessFunction {
14 pub fn new(fn_schema: FnSchema) -> Self {
15 IpMappingCoProcessFunction {
16 fn_schema
17 }
18 }
19}
20
21impl CoProcessFunction for IpMappingCoProcessFunction {
22 fn open(&mut self, _context: &Context) -> core::Result<()> {
23 Ok(())
24 }
25
26 fn process_left(&mut self, record: Record) -> Box<dyn Iterator<Item = Record>> {
27 Box::new(vec![record].into_iter())
28 }
29
30 fn process_right(
31 &mut self,
32 stream_seq: usize,
33 mut record: Record,
34 ) -> Box<dyn Iterator<Item = Record>> {
35 let payload = match kafka_message::Entity::parse(record.as_buffer()) {
36 Ok(entity) => entity.payload,
37 _ => {
38 return Box::new(vec![].into_iter());
39 }
40 };
41 let line = match String::from_utf8(payload.to_vec()) {
42 Ok(line) => line,
43 _ => return Box::new(vec![].into_iter()),
44 };
45 info!(
46 "ip mapping config update:stream_seq={},value={}",
47 stream_seq, line
48 );
49 match update_ip_mapping(line) {
50 Ok(()) => {}
51 Err(e) => error!("update ip mapping error.{}", e),
52 }
53 Box::new(vec![].into_iter())
54 }
55
56 fn close(&mut self) -> core::Result<()> {
57 Ok(())
58 }
59
60 fn schema(&self, _input_schema: FnSchema) -> FnSchema {
61 self.fn_schema.clone()
62 }
63}
64
65fn update_ip_mapping(context: String) -> serde_json::Result<()> {
66 let response: IpMappingChangeResponse = serde_json::from_str(context.as_str())?;
67 let is_del = response.change_type.eq("delete");
68 update_ip_mapping_by_id(response.info, is_del);
69 Ok(())
70}
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73struct IpMappingChangeResponse {
74 #[serde(rename = "changeType")]
75 change_type: String,
76 info: IpMappingItem,
77}