1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
use std::borrow::BorrowMut; use rlink::core; use rlink::core::element::Record; use rlink::core::function::{CoProcessFunction, Context}; use rlink_connector_kafka::KafkaRecord; use crate::ip_mapping_config::{update_ip_mapping_by_id, IpMappingItem}; #[derive(Debug, Function)] pub struct IpMappingCoProcessFunction {} impl IpMappingCoProcessFunction { pub fn new() -> Self { IpMappingCoProcessFunction {} } } impl CoProcessFunction for IpMappingCoProcessFunction { fn open(&mut self, _context: &Context) -> core::Result<()> { Ok(()) } fn process_left(&mut self, record: Record) -> Box<dyn Iterator<Item = Record>> { Box::new(vec![record].into_iter()) } fn process_right( &mut self, stream_seq: usize, mut record: Record, ) -> Box<dyn Iterator<Item = Record>> { let kafka_record = KafkaRecord::new(record.borrow_mut()); let payload = match kafka_record.get_kafka_payload() { Ok(payload) => payload, _ => return Box::new(vec![].into_iter()), }; let line = match String::from_utf8(payload.to_vec()) { Ok(line) => line, _ => return Box::new(vec![].into_iter()), }; info!( "ip mapping config update:stream_seq={},value={}", stream_seq, line ); match update_ip_mapping(line) { Ok(()) => {} Err(e) => error!("update ip mapping error.{}", e), } Box::new(vec![].into_iter()) } fn close(&mut self) -> core::Result<()> { Ok(()) } } fn update_ip_mapping(context: String) -> serde_json::Result<()> { let response: IpMappingChangeResponse = serde_json::from_str(context.as_str())?; let is_del = response.change_type.eq("delete"); update_ip_mapping_by_id(response.info, is_del); Ok(()) } #[derive(Clone, Debug, Serialize, Deserialize)] struct IpMappingChangeResponse { #[serde(rename = "changeType")] change_type: String, info: IpMappingItem, }