agp_datapath/tables/
subscription_table.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{HashMap, HashSet};
5use std::fmt::{Display, Formatter};
6
7use parking_lot::{RawRwLock, RwLock, lock_api::RwLockWriteGuard};
8use rand::Rng;
9use tracing::{debug, error, warn};
10
11use super::pool::Pool;
12use super::{SubscriptionTable, errors::SubscriptionTableError};
13use crate::messages::encoder::DEFAULT_AGENT_ID;
14use crate::messages::{Agent, AgentType};
15
16#[derive(Debug, Default, Clone)]
17struct ConnId {
18    conn_id: u64,   // connection id
19    counter: usize, // number of references
20}
21
22impl ConnId {
23    fn new(conn_id: u64) -> Self {
24        ConnId {
25            conn_id,
26            counter: 1,
27        }
28    }
29}
30
31#[derive(Debug)]
32struct Connections {
33    // map from connection id to the position in the connections pool
34    // this is used in the insertion/remove
35    index: HashMap<u64, usize>,
36    // pool of all connections ids that can to be used in the match
37    pool: Pool<ConnId>,
38}
39
40impl Default for Connections {
41    fn default() -> Self {
42        Connections {
43            index: HashMap::new(),
44            pool: Pool::with_capacity(2),
45        }
46    }
47}
48
49impl Connections {
50    fn insert(&mut self, conn: u64) {
51        match self.index.get(&conn) {
52            None => {
53                let conn_id = ConnId::new(conn);
54                let pos = self.pool.insert(conn_id);
55                self.index.insert(conn, pos);
56            }
57            Some(pos) => match self.pool.get_mut(*pos) {
58                None => {
59                    error!("error retrieving the connection from the pool");
60                }
61                Some(conn_id) => {
62                    conn_id.counter += 1;
63                }
64            },
65        }
66    }
67
68    fn remove(&mut self, conn: u64) -> Result<(), SubscriptionTableError> {
69        let conn_index_opt = self.index.get(&conn);
70        if conn_index_opt.is_none() {
71            error!("cannot find the index for connection {}", conn);
72            return Err(SubscriptionTableError::ConnectionIdNotFound);
73        }
74        let conn_index = conn_index_opt.unwrap();
75        let conn_id_opt = self.pool.get_mut(*conn_index);
76        if conn_id_opt.is_none() {
77            error!("cannot find the connection {} in the pool", conn);
78            return Err(SubscriptionTableError::ConnectionIdNotFound);
79        }
80        let conn_id = conn_id_opt.unwrap();
81        if conn_id.counter == 1 {
82            // remove connection
83            self.pool.remove(*conn_index);
84            self.index.remove(&conn);
85        } else {
86            conn_id.counter -= 1;
87        }
88        Ok(())
89    }
90
91    fn get_one(&self, except_conn: u64) -> Option<u64> {
92        if self.index.len() == 1 {
93            if self.index.contains_key(&except_conn) {
94                debug!("the only available connection cannot be used");
95                return None;
96            } else {
97                let val = self.index.iter().next().unwrap();
98                return Some(*val.0);
99            }
100        }
101
102        // we need to iterate and find a value starting from a random point in the pool
103        let mut rng = rand::rng();
104        let index = rng.random_range(0..self.pool.max_set() + 1);
105        let mut stop = false;
106        let mut i = index;
107        while !stop {
108            let opt = self.pool.get(i);
109            if opt.is_some() {
110                let out = opt.unwrap().conn_id;
111                if out != except_conn {
112                    return Some(out);
113                }
114            }
115            i = (i + 1) % (self.pool.max_set() + 1);
116            if i == index {
117                stop = true;
118            }
119        }
120        debug!("no output connection available");
121        None
122    }
123
124    fn get_all(&self, except_conn: u64) -> Option<Vec<u64>> {
125        if self.index.len() == 1 {
126            if self.index.contains_key(&except_conn) {
127                debug!("the only available connection cannot be used");
128                return None;
129            } else {
130                let val = self.index.iter().next().unwrap();
131                return Some(vec![*val.0]);
132            }
133        }
134        let mut out = Vec::new();
135        for val in self.index.iter() {
136            if *val.0 != except_conn {
137                out.push(*val.0);
138            }
139        }
140        if out.is_empty() {
141            debug!("no output connection available");
142            None
143        } else {
144            Some(out)
145        }
146    }
147}
148
149#[derive(Debug, Default)]
150struct AgentTypeState {
151    // map agent id -> [local connection ids, remote connection ids]
152    // the array contains the local connections at position 0 and the
153    // remote ones at position 1
154    // the number of connections per agent id is expected to be small
155    ids: HashMap<u64, [Vec<u64>; 2]>,
156    // List of all the connections that are available for this agent type
157    // as for the ids map position 0 stores local connections and position
158    // 1 store remotes ones
159    connections: [Connections; 2],
160}
161
162impl AgentTypeState {
163    fn new(agent_id: u64, conn: u64, is_local: bool) -> Self {
164        let mut type_state = AgentTypeState::default();
165        let v = vec![conn];
166        if is_local {
167            type_state.connections[0].insert(conn);
168            type_state.ids.insert(agent_id, [v, vec![]]);
169        } else {
170            type_state.connections[1].insert(conn);
171            type_state.ids.insert(agent_id, [vec![], v]);
172        }
173        type_state
174    }
175
176    fn insert(&mut self, agent_id: u64, conn: u64, is_local: bool) {
177        let mut index = 0;
178        if !is_local {
179            index = 1;
180        }
181        self.connections[index].insert(conn);
182
183        match self.ids.get_mut(&agent_id) {
184            None => {
185                // the agent id does not exists
186                let mut connections = [vec![], vec![]];
187                connections[index].push(conn);
188                self.ids.insert(agent_id, connections);
189            }
190            Some(v) => {
191                v[index].push(conn);
192            }
193        }
194    }
195
196    fn remove(
197        &mut self,
198        agent_id: &u64,
199        conn: u64,
200        is_local: bool,
201    ) -> Result<(), SubscriptionTableError> {
202        match self.ids.get_mut(agent_id) {
203            None => {
204                warn!("agent id {} not found", agent_id);
205                Err(SubscriptionTableError::AgentIdNotFound)
206            }
207            Some(connection_ids) => {
208                let mut index = 0;
209                if !is_local {
210                    index = 1;
211                }
212                self.connections[index].remove(conn)?;
213                for (i, c) in connection_ids[index].iter().enumerate() {
214                    if *c == conn {
215                        connection_ids[index].swap_remove(i);
216                        // if both vectors are empty remove the agent id from the tabales
217                        if connection_ids[0].is_empty() && connection_ids[1].is_empty() {
218                            self.ids.remove(agent_id);
219                        }
220                        break;
221                    }
222                }
223                Ok(())
224            }
225        }
226    }
227
228    fn get_one_connection(
229        &self,
230        agent_id: Option<u64>,
231        incoming_conn: u64,
232        get_local_connection: bool,
233    ) -> Option<u64> {
234        let mut index = 0;
235        if !get_local_connection {
236            index = 1;
237        }
238        match agent_id {
239            None => self.connections[index].get_one(incoming_conn),
240            Some(id) => {
241                let val = self.ids.get(&id);
242                match val {
243                    None => {
244                        debug!(
245                            "cannot find out connection, agent id does not exists {:?}",
246                            id
247                        );
248                        None
249                    }
250                    Some(vec) => {
251                        if vec[index].is_empty() {
252                            // no connections available
253                            return None;
254                        }
255
256                        if vec[index].len() == 1 {
257                            if vec[index][0] == incoming_conn {
258                                // cannot return the incoming interface d
259                                debug!("the only available connection cannot be used");
260                                return None;
261                            } else {
262                                return Some(vec[index][0]);
263                            }
264                        }
265
266                        // we need to iterate an find a value starting from a random point in the vec
267                        let mut rng = rand::rng();
268                        let pos = rng.random_range(0..vec.len());
269                        let mut stop = false;
270                        let mut i = pos;
271                        while !stop {
272                            if vec[index][pos] != incoming_conn {
273                                return Some(vec[index][pos]);
274                            }
275                            i = (i + 1) % vec[index].len();
276                            if i == pos {
277                                stop = true;
278                            }
279                        }
280                        debug!("no output connection available");
281                        None
282                    }
283                }
284            }
285        }
286    }
287
288    fn get_all_connections(
289        &self,
290        agent_id: Option<u64>,
291        incoming_conn: u64,
292        get_local_connection: bool,
293    ) -> Option<Vec<u64>> {
294        let mut index = 0;
295        if !get_local_connection {
296            index = 1;
297        }
298        match agent_id {
299            None => self.connections[index].get_all(incoming_conn),
300            Some(id) => {
301                let val = self.ids.get(&id);
302                match val {
303                    None => {
304                        debug!(
305                            "cannot find out connection, agent id does not exists {:?}",
306                            id
307                        );
308                        None
309                    }
310                    Some(vec) => {
311                        if vec[index].is_empty() {
312                            // should never happen
313                            return None;
314                        }
315
316                        if vec[index].len() == 1 {
317                            if vec[index][0] == incoming_conn {
318                                // cannot return the incoming interface d
319                                debug!("the only available connection cannot be used");
320                                return None;
321                            } else {
322                                return Some(vec[index].clone());
323                            }
324                        }
325
326                        // we need to iterate over the vector and remove the incoming connection
327                        let mut out = Vec::new();
328                        for c in vec[index].iter() {
329                            if *c != incoming_conn {
330                                out.push(*c);
331                            }
332                        }
333                        if out.is_empty() { None } else { Some(out) }
334                    }
335                }
336            }
337        }
338    }
339}
340
341#[derive(Debug, Default)]
342pub struct SubscriptionTableImpl {
343    // subscriptions table
344    // agent_type -> type state
345    // if a subscription comes for a specific agent_id, it is added
346    // to that specific agent_id, otherwise the connection is added
347    // to the DEFAULT_AGENT_ID
348    table: RwLock<HashMap<AgentType, AgentTypeState>>,
349    // connections tables
350    // conn_index -> set(agent)
351    connections: RwLock<HashMap<u64, HashSet<Agent>>>,
352}
353
354impl Display for SubscriptionTableImpl {
355    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
356        // print main table
357        let table = self.table.read();
358        writeln!(f, "Subscription Table")?;
359        for (k, v) in table.iter() {
360            writeln!(f, "Type: {:?}", k)?;
361            writeln!(f, "  Agents:")?;
362            for (id, conn) in v.ids.iter() {
363                writeln!(f, "    Agent id: {}", id)?;
364                if conn[0].is_empty() {
365                    writeln!(f, "       Local Connections:")?;
366                    writeln!(f, "         None")?;
367                } else {
368                    writeln!(f, "       Local Connections:")?;
369                    for c in conn[0].iter() {
370                        writeln!(f, "         Connection: {}", c)?;
371                    }
372                }
373                if conn[1].is_empty() {
374                    writeln!(f, "       Remote Connections:")?;
375                    writeln!(f, "         None")?;
376                } else {
377                    writeln!(f, "       Remote Connections:")?;
378                    for c in conn[1].iter() {
379                        writeln!(f, "         Connection: {}", c)?;
380                    }
381                }
382            }
383        }
384
385        Ok(())
386    }
387}
388
389fn add_subscription_to_sub_table(
390    agent: &Agent,
391    conn: u64,
392    is_local: bool,
393    mut table: RwLockWriteGuard<'_, RawRwLock, HashMap<AgentType, AgentTypeState>>,
394) {
395    match table.get_mut(agent.agent_type()) {
396        None => {
397            let uid = agent.agent_id();
398            debug!(
399                "subscription table: add first subscription for type {}, agent_id {} on connection {}",
400                agent.agent_type(),
401                uid,
402                conn,
403            );
404            // the subscription does not exists, init
405            // create and init type state
406            let state = AgentTypeState::new(uid, conn, is_local);
407
408            // insert the map in the table
409            table.insert(agent.agent_type().clone(), state);
410        }
411        Some(state) => {
412            state.insert(agent.agent_id(), conn, is_local);
413        }
414    }
415}
416
417fn add_subscription_to_connection(
418    agent: &Agent,
419    conn_index: u64,
420    mut map: RwLockWriteGuard<'_, RawRwLock, HashMap<u64, HashSet<Agent>>>,
421) -> Result<(), SubscriptionTableError> {
422    let set = map.get_mut(&conn_index);
423    match set {
424        None => {
425            debug!(
426                "add first subscription for type {}, agent_id {} on connection {}",
427                agent.agent_type(),
428                agent.agent_id(),
429                conn_index,
430            );
431            let mut set = HashSet::new();
432            set.insert(agent.clone());
433            map.insert(conn_index, set);
434        }
435        Some(s) => {
436            if !s.insert(agent.clone()) {
437                warn!(
438                    "subscription for type {}, agent_id {} already exists for connection {}, ignore the message",
439                    agent.agent_type(),
440                    agent.agent_id(),
441                    conn_index,
442                );
443                return Ok(());
444            }
445        }
446    }
447    debug!(
448        "subscription for type {}, agent_id {} successfully added on connection {}",
449        agent.agent_type(),
450        agent.agent_id(),
451        conn_index,
452    );
453    Ok(())
454}
455
456fn remove_subscription_from_sub_table(
457    agent: &Agent,
458    conn_index: u64,
459    is_local: bool,
460    mut table: RwLockWriteGuard<'_, RawRwLock, HashMap<AgentType, AgentTypeState>>,
461) -> Result<(), SubscriptionTableError> {
462    match table.get_mut(agent.agent_type()) {
463        None => {
464            debug!("subscription not found{:?}", agent.agent_type());
465            Err(SubscriptionTableError::SubscriptionNotFound)
466        }
467        Some(state) => {
468            state.remove(&agent.agent_id(), conn_index, is_local)?;
469            // we may need to remove the state
470            if state.ids.is_empty() {
471                table.remove(agent.agent_type());
472            }
473            Ok(())
474        }
475    }
476}
477
478fn remove_subscription_from_connection(
479    agent: &Agent,
480    conn_index: u64,
481    mut map: RwLockWriteGuard<'_, RawRwLock, HashMap<u64, HashSet<Agent>>>,
482) -> Result<(), SubscriptionTableError> {
483    let set = map.get_mut(&conn_index);
484    match set {
485        None => {
486            warn!("connection id {:?} not found", conn_index);
487            return Err(SubscriptionTableError::ConnectionIdNotFound);
488        }
489        Some(s) => {
490            if !s.remove(agent) {
491                warn!(
492                    "subscription for type {}, agent_id {} not found on connection {}",
493                    agent.agent_type(),
494                    agent.agent_id(),
495                    conn_index,
496                );
497                return Err(SubscriptionTableError::SubscriptionNotFound);
498            }
499            if s.is_empty() {
500                map.remove(&conn_index);
501            }
502        }
503    }
504    debug!(
505        "subscription for type {}, agent_id {} successfully removed on connection {}",
506        agent.agent_type(),
507        agent.agent_id(),
508        conn_index,
509    );
510    Ok(())
511}
512
513impl SubscriptionTable for SubscriptionTableImpl {
514    fn for_each<F>(&self, mut f: F)
515    where
516        F: FnMut(&AgentType, u64, &[u64], &[u64]),
517    {
518        let table = self.table.read();
519
520        for (k, v) in table.iter() {
521            for (id, conn) in v.ids.iter() {
522                f(k, *id, conn[0].as_ref(), conn[1].as_ref());
523            }
524        }
525    }
526
527    fn add_subscription(
528        &self,
529        agent_type: AgentType,
530        agent_uid: Option<u64>,
531        conn: u64,
532        is_local: bool,
533    ) -> Result<(), SubscriptionTableError> {
534        let agent = Agent::new(agent_type, agent_uid.unwrap_or(DEFAULT_AGENT_ID));
535        {
536            let conn_table = self.connections.read();
537            match conn_table.get(&conn) {
538                None => {}
539                Some(set) => {
540                    if set.contains(&agent) {
541                        debug!(
542                            "subscription {:?} on connection {:?} already exists, ignore the message",
543                            agent, conn
544                        );
545                        return Ok(());
546                    }
547                }
548            }
549        }
550        {
551            let table = self.table.write();
552            add_subscription_to_sub_table(&agent, conn, is_local, table);
553        }
554        {
555            let conn_table = self.connections.write();
556            add_subscription_to_connection(&agent, conn, conn_table)?;
557        }
558        Ok(())
559    }
560
561    fn remove_subscription(
562        &self,
563        agent_type: AgentType,
564        agent_id: Option<u64>,
565        conn: u64,
566        is_local: bool,
567    ) -> Result<(), SubscriptionTableError> {
568        let agent = Agent::new(agent_type, agent_id.unwrap_or(DEFAULT_AGENT_ID));
569        {
570            let table = self.table.write();
571
572            remove_subscription_from_sub_table(&agent, conn, is_local, table)?
573        }
574        {
575            let conn_table = self.connections.write();
576            remove_subscription_from_connection(&agent, conn, conn_table)?
577        }
578        Ok(())
579    }
580
581    fn remove_connection(&self, conn: u64, is_local: bool) -> Result<(), SubscriptionTableError> {
582        {
583            let conn_map = self.connections.read();
584            let set = conn_map.get(&conn);
585            if set.is_none() {
586                return Err(SubscriptionTableError::ConnectionIdNotFound);
587            }
588            for agent in set.unwrap() {
589                let table = self.table.write();
590                debug!("remove subscription {} from connection {}", agent, conn);
591                remove_subscription_from_sub_table(agent, conn, is_local, table)?;
592            }
593        }
594        {
595            let mut conn_map = self.connections.write();
596            conn_map.remove(&conn); // here the connection must exists.
597        }
598        Ok(())
599    }
600
601    fn match_one(
602        &self,
603        agent_type: AgentType,
604        agent_id: Option<u64>,
605        incoming_conn: u64,
606    ) -> Result<u64, SubscriptionTableError> {
607        let table = self.table.read();
608        match table.get(&agent_type) {
609            None => {
610                debug!("match not found for type {:}", agent_type);
611                Err(SubscriptionTableError::NoMatch(format!(
612                    "{}, {:?}",
613                    agent_type, agent_id
614                )))
615            }
616            Some(state) => {
617                // first try to send the message to the local connections
618                // if no local connection exists or the message cannot
619                // be sent try on remote ones
620                let local_out = state.get_one_connection(agent_id, incoming_conn, true);
621                if let Some(out) = local_out {
622                    return Ok(out);
623                }
624                let remote_out = state.get_one_connection(agent_id, incoming_conn, false);
625                if let Some(out) = remote_out {
626                    return Ok(out);
627                }
628                error!("no output connection available");
629                Err(SubscriptionTableError::NoMatch(format!(
630                    "{}, {:?}",
631                    agent_type, agent_id
632                )))
633            }
634        }
635    }
636
637    fn match_all(
638        &self,
639        agent_type: AgentType,
640        agent_id: Option<u64>,
641        incoming_conn: u64,
642    ) -> Result<Vec<u64>, SubscriptionTableError> {
643        let table = self.table.read();
644        match table.get(&agent_type) {
645            None => {
646                debug!("match not found for type {:}", agent_type);
647                Err(SubscriptionTableError::NoMatch(format!(
648                    "{}, {:?}",
649                    agent_type, agent_id
650                )))
651            }
652            Some(state) => {
653                // first try to send the message to the local connections
654                // if no local connection exists or the message cannot
655                // be sent try on remote ones
656                let local_out = state.get_all_connections(agent_id, incoming_conn, true);
657                if let Some(out) = local_out {
658                    return Ok(out);
659                }
660                let remote_out = state.get_all_connections(agent_id, incoming_conn, false);
661                if let Some(out) = remote_out {
662                    return Ok(out);
663                }
664                error!("no output connection available");
665                Err(SubscriptionTableError::NoMatch(format!(
666                    "{}, {:?}",
667                    agent_type, agent_id
668                )))
669            }
670        }
671    }
672}
673
674#[cfg(test)]
675mod tests {
676    use super::*;
677
678    use tracing_test::traced_test;
679
680    #[test]
681    #[traced_test]
682    fn test_table() {
683        let agent_type1 = AgentType::from_strings("Cisco", "Default", "type_ONE");
684        let agent_type2 = AgentType::from_strings("Cisco", "Default", "type_TWO");
685        let agent_type3 = AgentType::from_strings("Cisco", "Default", "type_THREE");
686
687        let t = SubscriptionTableImpl::default();
688
689        assert_eq!(
690            t.add_subscription(agent_type1.clone(), None, 1, false),
691            Ok(())
692        );
693        assert_eq!(
694            t.add_subscription(agent_type1.clone(), None, 2, false),
695            Ok(())
696        );
697        assert_eq!(
698            t.add_subscription(agent_type1.clone(), Some(1), 3, false),
699            Ok(())
700        );
701        assert_eq!(
702            t.add_subscription(agent_type2.clone(), Some(2), 3, false),
703            Ok(())
704        );
705
706        // returns three matches on connection 1,2,3
707        let out = t.match_all(agent_type1.clone(), None, 100).unwrap();
708        assert_eq!(out.len(), 3);
709        assert!(out.contains(&1));
710        assert!(out.contains(&2));
711        assert!(out.contains(&3));
712
713        // return two matches on connection 2,3
714        let out = t.match_all(agent_type1.clone(), None, 1).unwrap();
715        assert_eq!(out.len(), 2);
716        assert!(out.contains(&2));
717        assert!(out.contains(&3));
718
719        assert_eq!(
720            t.remove_subscription(agent_type1.clone(), None, 2, false),
721            Ok(())
722        );
723
724        // return two matches on connection 1,3
725        let out = t.match_all(agent_type1.clone(), None, 100).unwrap();
726        assert_eq!(out.len(), 2);
727        assert!(out.contains(&1));
728        assert!(out.contains(&3));
729
730        assert_eq!(
731            t.remove_subscription(agent_type1.clone(), Some(1), 3, false),
732            Ok(())
733        );
734
735        // return one matches on connection 1
736        let out = t.match_all(agent_type1.clone(), None, 100).unwrap();
737        assert_eq!(out.len(), 1);
738        assert!(out.contains(&1));
739
740        // return no match
741        assert_eq!(
742            t.match_all(agent_type1.clone(), None, 1),
743            Err(SubscriptionTableError::NoMatch(format!(
744                "{}, {:?}",
745                agent_type1,
746                Option::<u64>::None
747            )))
748        );
749
750        // add subscription again
751        assert_eq!(
752            t.add_subscription(agent_type1.clone(), Some(1), 2, false),
753            Ok(())
754        );
755
756        // returns two matches on connection 1 and 2
757        let out = t.match_all(agent_type1.clone(), None, 100).unwrap();
758        assert_eq!(out.len(), 2);
759        assert!(out.contains(&1));
760        assert!(out.contains(&2));
761
762        // run multiple times for randomenes
763        for _ in 0..20 {
764            let out = t.match_one(agent_type1.clone(), None, 100).unwrap();
765            if out != 1 && out != 2 {
766                // the output must be 1 or 2
767                panic!("the output must be 1 or 2");
768            }
769        }
770
771        // return connection 2
772        let out = t.match_one(agent_type1.clone(), Some(1), 100).unwrap();
773        assert_eq!(out, 2);
774
775        // return connection 3
776        let out = t.match_one(agent_type2.clone(), Some(2), 100).unwrap();
777        assert_eq!(out, 3);
778
779        assert_eq!(t.remove_connection(2, false), Ok(()));
780
781        // returns one match on connection 1
782        let out = t.match_all(agent_type1.clone(), None, 100).unwrap();
783        assert_eq!(out.len(), 1);
784        assert!(out.contains(&1));
785
786        assert_eq!(
787            t.add_subscription(agent_type2.clone(), Some(2), 4, false),
788            Ok(())
789        );
790
791        // run multiple times for randomness
792        for _ in 0..20 {
793            let out = t.match_one(agent_type2.clone(), Some(2), 100).unwrap();
794            if out != 3 && out != 4 {
795                // the output must be 2 or 4
796                panic!("the output must be 2 or 4");
797            }
798        }
799
800        assert_eq!(
801            t.remove_subscription(agent_type2.clone(), Some(2), 4, false),
802            Ok(())
803        );
804
805        // test local vs remote
806        assert_eq!(
807            t.add_subscription(agent_type1.clone(), None, 2, true),
808            Ok(())
809        );
810
811        // returns one match on connection 2
812        let out = t.match_all(agent_type1.clone(), None, 100).unwrap();
813        assert_eq!(out.len(), 1);
814        assert!(out.contains(&2));
815
816        // returns one match on connection 2
817        let out = t.match_one(agent_type1.clone(), None, 100).unwrap();
818        assert_eq!(out, 2);
819
820        // fallback on remote connection and return one match on connection 1
821        let out = t.match_all(agent_type1.clone(), None, 2).unwrap();
822        assert_eq!(out.len(), 1);
823        assert!(out.contains(&1));
824
825        // same here
826        let out = t.match_one(agent_type1.clone(), None, 2).unwrap();
827        assert_eq!(out, 1);
828
829        // test errors
830        assert_eq!(
831            t.remove_connection(4, false),
832            Err(SubscriptionTableError::ConnectionIdNotFound)
833        );
834        assert_eq!(
835            t.match_one(agent_type1.clone(), Some(1), 100),
836            Err(SubscriptionTableError::NoMatch(format!(
837                "{}, {:?}",
838                agent_type1,
839                Some(1)
840            )))
841        );
842        assert_eq!(
843            // this generates a warning
844            t.add_subscription(agent_type2.clone(), Some(2), 3, false),
845            Ok(())
846        );
847        assert_eq!(
848            t.remove_subscription(agent_type3.clone(), None, 2, false),
849            Err(SubscriptionTableError::SubscriptionNotFound)
850        );
851        assert_eq!(
852            t.remove_subscription(agent_type2.clone(), None, 2, false),
853            Err(SubscriptionTableError::AgentIdNotFound)
854        );
855    }
856
857    #[test]
858    fn test_iter() {
859        let agent_type1 = AgentType::from_strings("Org", "Default", "type_ONE");
860        let agent_type2 = AgentType::from_strings("Org", "Default", "type_TWO");
861
862        let t = SubscriptionTableImpl::default();
863
864        assert_eq!(
865            t.add_subscription(agent_type1.clone(), None, 1, false),
866            Ok(())
867        );
868        assert_eq!(
869            t.add_subscription(agent_type1.clone(), None, 2, false),
870            Ok(())
871        );
872        assert_eq!(
873            t.add_subscription(agent_type2.clone(), None, 3, true),
874            Ok(())
875        );
876
877        let mut h = HashMap::new();
878
879        t.for_each(|k, id, local, remote| {
880            println!(
881                "key: {}, id: {}, local: {:?}, remote: {:?}",
882                k, id, local, remote
883            );
884
885            h.insert(k.clone(), (id, local.to_vec(), remote.to_vec()));
886        });
887
888        assert_eq!(h.len(), 2);
889        assert_eq!(h[&agent_type1].0, DEFAULT_AGENT_ID);
890        assert_eq!(h[&agent_type1].1, vec![]);
891        assert_eq!(h[&agent_type1].2, vec![1, 2]);
892
893        assert_eq!(h[&agent_type2].0, DEFAULT_AGENT_ID);
894        assert_eq!(h[&agent_type2].1, vec![3]);
895        assert_eq!(h[&agent_type2].2, vec![]);
896    }
897}