dmfr_dataset_reader/
lib.rs

1/// Copyright: Kyler Chin <kyler@catenarymaps.org>
2/// Catenary Transit Initiatives
3/// Removal of the attribution is not allowed, as covered under the AGPL license
4
5use dmfr::*;
6use serde_json::Error as SerdeError;
7use std::collections::{HashMap, HashSet};
8use std::error::Error;
9use std::fs;
10use std::sync::Arc;
11
12#[derive(Debug, Clone)]
13pub struct OperatorPairInfo {
14    pub operator_id: String,
15    pub gtfs_agency_id: Option<String>,
16}
17
18#[derive(Debug, Clone)]
19pub struct FeedPairInfo {
20    pub feed_onestop_id: String,
21    pub gtfs_agency_id: Option<String>,
22}
23
24pub type FeedId = String;
25pub type OperatorId = String;
26
27#[derive(Debug)]
28pub struct ReturnDmfrAnalysis {
29    pub feed_hashmap: HashMap<FeedId, dmfr::Feed>,
30    pub operator_hashmap: HashMap<OperatorId, dmfr::Operator>,
31    pub operator_to_feed_hashmap: HashMap<OperatorId, Vec<FeedPairInfo>>,
32    pub feed_to_operator_pairs_hashmap: HashMap<FeedId, Vec<OperatorPairInfo>>,
33    pub list_of_bad_files: Option<Vec<String>>
34}
35
36pub fn process_feed(
37    feed: &dmfr::Feed,
38    feed_hashmap: &mut HashMap<FeedId, dmfr::Feed>,
39    operator_hashmap: &mut HashMap<OperatorId, dmfr::Operator>,
40    operator_to_feed_hashmap: &mut HashMap<OperatorId, Vec<FeedPairInfo>>,
41    feed_to_operator_pairs_hashmap: &mut HashMap<FeedId, Vec<OperatorPairInfo>>,
42) -> () {
43    feed_hashmap.entry(feed.id.clone()).or_insert(feed.clone());
44
45    for operator in feed.operators.iter() {
46        process_operator(
47            &operator,
48            feed_hashmap,
49            operator_hashmap,
50            operator_to_feed_hashmap,
51            feed_to_operator_pairs_hashmap,
52            Some(&feed.id),
53        );
54
55        operator_to_feed_hashmap
56            .entry(operator.onestop_id.clone())
57            .and_modify(|associated_feeds| {
58                let set_of_existing_ids: HashSet<String> = HashSet::from_iter(
59                    associated_feeds
60                        .iter()
61                        .map(|feed_item| feed_item.feed_onestop_id.clone()),
62                );
63
64                if !set_of_existing_ids.contains(&feed.id) {
65                    associated_feeds.push(FeedPairInfo {
66                        feed_onestop_id: feed.id.clone(),
67                        gtfs_agency_id: None,
68                    });
69                }
70            })
71            .or_insert(vec![FeedPairInfo {
72                feed_onestop_id: feed.id.clone(),
73                gtfs_agency_id: None,
74            }]);
75
76        feed_to_operator_pairs_hashmap
77            .entry(feed.id.clone())
78            .and_modify(|operator_pairs| {
79                let set_of_existing_operator_ids: HashSet<String> = HashSet::from_iter(
80                    operator_pairs
81                        .iter()
82                        .map(|operator_pair| operator_pair.operator_id.clone()),
83                );
84
85                if !set_of_existing_operator_ids.contains(&operator.onestop_id.clone()) {
86                    operator_pairs.push(OperatorPairInfo {
87                        operator_id: operator.onestop_id.clone(),
88                        gtfs_agency_id: None,
89                    });
90                }
91            })
92            .or_insert(vec![OperatorPairInfo {
93                operator_id: operator.onestop_id.clone(),
94                gtfs_agency_id: None,
95            }]);
96    }
97}
98
99pub fn process_operator(
100    operator: &dmfr::Operator,
101    feed_hashmap: &mut HashMap<FeedId, dmfr::Feed>,
102    operator_hashmap: &mut HashMap<OperatorId, dmfr::Operator>,
103    operator_to_feed_hashmap: &mut HashMap<OperatorId, Vec<FeedPairInfo>>,
104    feed_to_operator_pairs_hashmap: &mut HashMap<FeedId, Vec<OperatorPairInfo>>,
105    parent_feed_id: Option<&str>,
106) -> () {
107    operator_hashmap
108        .entry(operator.onestop_id.clone())
109        .or_insert(operator.clone());
110
111    for associated_feed in operator.associated_feeds.iter() {
112        let mut associated_feed_insertion: FeedPairInfo =
113            match associated_feed.feed_onestop_id.as_ref() {
114                Some(feed_onestop_id) => FeedPairInfo {
115                    feed_onestop_id: feed_onestop_id.clone(),
116                    gtfs_agency_id: associated_feed.feed_onestop_id.clone(),
117                },
118                None => FeedPairInfo {
119                    feed_onestop_id: String::from(*parent_feed_id.as_ref().unwrap()),
120                    gtfs_agency_id: associated_feed.feed_onestop_id.clone(),
121                },
122            };
123
124        //if associated_feed_insertion.feed_onestop_id == Some(String::from("f-ucla~bruinbus~rt")) {
125        //    println!("Bruin realtime feed found! {:?}", associated_feed_insertion);
126        //}
127
128        operator_to_feed_hashmap
129            .entry(operator.onestop_id.clone())
130            .and_modify(|associated_feeds| {
131                let set_of_existing_ids: HashSet<String> = HashSet::from_iter(
132                    associated_feeds
133                        .iter()
134                        .map(|feed_item| feed_item.feed_onestop_id.clone()),
135                );
136
137                if !set_of_existing_ids.contains(&associated_feed_insertion.feed_onestop_id) {
138                    associated_feeds.push(associated_feed_insertion.clone())
139                }
140            })
141            .or_insert(vec![associated_feed_insertion.clone()]);
142
143        feed_to_operator_pairs_hashmap
144            .entry(associated_feed_insertion.feed_onestop_id.clone())
145            .and_modify(|operator_pairs| {
146                let set_of_existing_operator_ids: HashSet<String> = HashSet::from_iter(
147                    operator_pairs
148                        .iter()
149                        .map(|operator_pair| operator_pair.operator_id.clone()),
150                );
151
152                if !set_of_existing_operator_ids.contains(&operator.onestop_id.clone()) {
153                    operator_pairs.push(OperatorPairInfo {
154                        operator_id: operator.onestop_id.clone(),
155                        gtfs_agency_id: associated_feed_insertion.gtfs_agency_id.clone(),
156                    });
157                }
158            })
159            .or_insert(vec![OperatorPairInfo {
160                operator_id: operator.onestop_id.clone(),
161                gtfs_agency_id: associated_feed_insertion.gtfs_agency_id.clone(),
162            }]);
163    }
164}
165
166pub fn read_folders(path: &str) -> Result<ReturnDmfrAnalysis, Box<dyn Error + Send + Sync>> {
167    let feed_entries = fs::read_dir(format!("{}/feeds/", path))?;
168
169    let mut feed_hashmap: HashMap<FeedId, dmfr::Feed> = HashMap::new();
170    let mut operator_hashmap: HashMap<OperatorId, dmfr::Operator> = HashMap::new();
171    let mut operator_to_feed_hashmap: HashMap<OperatorId, Vec<FeedPairInfo>> = HashMap::new();
172    let mut feed_to_operator_pairs_hashmap: HashMap<FeedId, Vec<OperatorPairInfo>> = HashMap::new();
173
174    let mut list_of_bad_files:Vec<String> = vec![];
175
176    for entry in feed_entries {
177        if let Ok(entry) = entry {
178            if let Some(file_name) = entry.file_name().to_str() {
179                //println!("{}", file_name);
180                let contents = fs::read_to_string(format!("{}/feeds/{}", path, file_name));
181                if contents.is_err() {
182                    eprintln!(
183                        "Error Reading Feed File {}: {}",
184                        file_name,
185                        contents.unwrap_err()
186                    );
187                    continue;
188                }
189                let dmfrinfo: Result<dmfr::DistributedMobilityFeedRegistry, SerdeError> =
190                    serde_json::from_str(&contents.unwrap());
191                match dmfrinfo {
192                    Ok(dmfrinfo) => {
193                        for feed in dmfrinfo.feeds.into_iter() {
194                            process_feed(
195                                &feed,
196                                &mut feed_hashmap,
197                                &mut operator_hashmap,
198                                &mut operator_to_feed_hashmap,
199                                &mut feed_to_operator_pairs_hashmap,
200                            );
201                        }
202
203                        for operator in dmfrinfo.operators.into_iter() {
204                            process_operator(
205                                &operator,
206                                &mut feed_hashmap,
207                                &mut operator_hashmap,
208                                &mut operator_to_feed_hashmap,
209                                &mut feed_to_operator_pairs_hashmap,
210                                None,
211                            );
212                        }
213                    }
214                    Err(_) => {
215                        list_of_bad_files.push(file_name.to_string());
216                    }
217                }
218            }
219        }
220    }
221
222    //cross check feed_to_operator_hashmap into feed_to_operator_pairs_hashmap
223
224    Ok(ReturnDmfrAnalysis {
225        feed_hashmap,
226        operator_hashmap,
227        operator_to_feed_hashmap,
228        feed_to_operator_pairs_hashmap,
229        list_of_bad_files: Some(list_of_bad_files)
230    })
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn test() {
239        println!("MAIN TEST");
240        let dmfr_result = read_folders("transitland-atlas/").unwrap();
241
242        assert!(dmfr_result.feed_hashmap.len() > 1000);
243
244        fs::write(
245            "operator_to_feed_hashmap.json",
246            format!("{:#?}", dmfr_result.operator_to_feed_hashmap),
247        )
248        .expect("Unable to write file");
249
250        fs::write(
251            "feed_to_operator_pairs_hashmap.json",
252            format!("{:#?}", dmfr_result.feed_to_operator_pairs_hashmap),
253        )
254        .expect("Unable to write file");
255
256        println!(
257            "{} feeds across {} operators",
258            dmfr_result.feed_hashmap.len(),
259            dmfr_result.operator_hashmap.len()
260        );
261
262        println!(
263            "Operator to feed hashmap length {}",
264            dmfr_result.operator_to_feed_hashmap.len()
265        );
266        println!(
267            "feed_to_operator_pairs_hashmap length {}",
268            dmfr_result.feed_to_operator_pairs_hashmap.len()
269        );
270
271        assert!(dmfr_result
272            .feed_to_operator_pairs_hashmap
273            .get("f-ucla~bruinbus~rt")
274            .is_some());
275        assert!(dmfr_result
276            .feed_to_operator_pairs_hashmap
277            .get("f-spokanetransitauthority~rt")
278            .is_some());
279    }
280}