dmfr_dataset_reader/
lib.rs

1/// Copyright: Kyler Chin <kyler@catenarymaps.org>
2/// Catenary Transit Initiatives
3/// Removal of the attribution is not allowed, as covered under the AGPL license
4use dmfr::*;
5use serde_json::Error as SerdeError;
6use std::collections::{HashMap, HashSet};
7use std::error::Error;
8use std::fs;
9use std::sync::Arc;
10
11#[derive(Debug, Clone)]
12pub struct OperatorPairInfo {
13    pub operator_id: String,
14    pub gtfs_agency_id: Option<String>,
15}
16
17#[derive(Debug, Clone)]
18pub struct FeedPairInfo {
19    pub feed_onestop_id: String,
20    pub gtfs_agency_id: Option<String>,
21}
22
23pub type FeedId = String;
24pub type OperatorId = String;
25
26#[derive(Debug)]
27pub struct ReturnDmfrAnalysis {
28    pub feed_hashmap: HashMap<FeedId, dmfr::Feed>,
29    pub operator_hashmap: HashMap<OperatorId, dmfr::Operator>,
30    pub operator_to_feed_hashmap: HashMap<OperatorId, Vec<FeedPairInfo>>,
31    pub feed_to_operator_pairs_hashmap: HashMap<FeedId, Vec<OperatorPairInfo>>,
32    pub list_of_bad_files: Option<Vec<String>>,
33}
34
35pub fn process_feed(
36    feed: &dmfr::Feed,
37    feed_hashmap: &mut HashMap<FeedId, dmfr::Feed>,
38    operator_hashmap: &mut HashMap<OperatorId, dmfr::Operator>,
39    operator_to_feed_hashmap: &mut HashMap<OperatorId, Vec<FeedPairInfo>>,
40    feed_to_operator_pairs_hashmap: &mut HashMap<FeedId, Vec<OperatorPairInfo>>,
41) -> () {
42    feed_hashmap.entry(feed.id.clone()).or_insert(feed.clone());
43
44    for operator in feed.operators.iter() {
45        process_operator(
46            &operator,
47            feed_hashmap,
48            operator_hashmap,
49            operator_to_feed_hashmap,
50            feed_to_operator_pairs_hashmap,
51            Some(&feed.id),
52        );
53
54        operator_to_feed_hashmap
55            .entry(operator.onestop_id.clone())
56            .and_modify(|associated_feeds| {
57                let set_of_existing_ids: HashSet<String> = HashSet::from_iter(
58                    associated_feeds
59                        .iter()
60                        .map(|feed_item| feed_item.feed_onestop_id.clone()),
61                );
62
63                if !set_of_existing_ids.contains(&feed.id) {
64                    associated_feeds.push(FeedPairInfo {
65                        feed_onestop_id: feed.id.clone(),
66                        gtfs_agency_id: None,
67                    });
68                }
69            })
70            .or_insert(vec![FeedPairInfo {
71                feed_onestop_id: feed.id.clone(),
72                gtfs_agency_id: None,
73            }]);
74
75        feed_to_operator_pairs_hashmap
76            .entry(feed.id.clone())
77            .and_modify(|operator_pairs| {
78                let set_of_existing_operator_ids: HashSet<String> = HashSet::from_iter(
79                    operator_pairs
80                        .iter()
81                        .map(|operator_pair| operator_pair.operator_id.clone()),
82                );
83
84                if !set_of_existing_operator_ids.contains(&operator.onestop_id.clone()) {
85                    operator_pairs.push(OperatorPairInfo {
86                        operator_id: operator.onestop_id.clone(),
87                        gtfs_agency_id: None,
88                    });
89                }
90            })
91            .or_insert(vec![OperatorPairInfo {
92                operator_id: operator.onestop_id.clone(),
93                gtfs_agency_id: None,
94            }]);
95    }
96}
97
98pub fn process_operator(
99    operator: &dmfr::Operator,
100    feed_hashmap: &mut HashMap<FeedId, dmfr::Feed>,
101    operator_hashmap: &mut HashMap<OperatorId, dmfr::Operator>,
102    operator_to_feed_hashmap: &mut HashMap<OperatorId, Vec<FeedPairInfo>>,
103    feed_to_operator_pairs_hashmap: &mut HashMap<FeedId, Vec<OperatorPairInfo>>,
104    parent_feed_id: Option<&str>,
105) -> () {
106    operator_hashmap
107        .entry(operator.onestop_id.clone())
108        .or_insert(operator.clone());
109
110    for associated_feed in operator.associated_feeds.iter() {
111        let associated_feed_insertion: FeedPairInfo = match associated_feed.feed_onestop_id.as_ref()
112        {
113            Some(feed_onestop_id) => FeedPairInfo {
114                feed_onestop_id: feed_onestop_id.clone(),
115                gtfs_agency_id: associated_feed.feed_onestop_id.clone(),
116            },
117            None => FeedPairInfo {
118                feed_onestop_id: {
119                    if let Some(parent_feed_id) = parent_feed_id {
120                        parent_feed_id.to_string()
121                    } else {
122                        eprintln!("Warning: Operator {} has associated feed with no feed_onestop_id and no parent feed id", operator.onestop_id);
123                        String::from("unknown-feed-onestop-id")
124                    }
125                },
126                gtfs_agency_id: associated_feed.feed_onestop_id.clone(),
127            },
128        };
129
130        //if associated_feed_insertion.feed_onestop_id == Some(String::from("f-ucla~bruinbus~rt")) {
131        //    println!("Bruin realtime feed found! {:?}", associated_feed_insertion);
132        //}
133
134        operator_to_feed_hashmap
135            .entry(operator.onestop_id.clone())
136            .and_modify(|associated_feeds| {
137                let set_of_existing_ids: HashSet<String> = HashSet::from_iter(
138                    associated_feeds
139                        .iter()
140                        .map(|feed_item| feed_item.feed_onestop_id.clone()),
141                );
142
143                if !set_of_existing_ids.contains(&associated_feed_insertion.feed_onestop_id) {
144                    associated_feeds.push(associated_feed_insertion.clone())
145                }
146            })
147            .or_insert(vec![associated_feed_insertion.clone()]);
148
149        feed_to_operator_pairs_hashmap
150            .entry(associated_feed_insertion.feed_onestop_id.clone())
151            .and_modify(|operator_pairs| {
152                let set_of_existing_operator_ids: HashSet<String> = HashSet::from_iter(
153                    operator_pairs
154                        .iter()
155                        .map(|operator_pair| operator_pair.operator_id.clone()),
156                );
157
158                if !set_of_existing_operator_ids.contains(&operator.onestop_id.clone()) {
159                    operator_pairs.push(OperatorPairInfo {
160                        operator_id: operator.onestop_id.clone(),
161                        gtfs_agency_id: associated_feed_insertion.gtfs_agency_id.clone(),
162                    });
163                }
164            })
165            .or_insert(vec![OperatorPairInfo {
166                operator_id: operator.onestop_id.clone(),
167                gtfs_agency_id: associated_feed_insertion.gtfs_agency_id.clone(),
168            }]);
169    }
170}
171
172pub fn read_folders(path: &str) -> Result<ReturnDmfrAnalysis, Box<dyn Error + Send + Sync>> {
173    let feed_entries = fs::read_dir(format!("{}/feeds/", path))?;
174
175    let mut feed_hashmap: HashMap<FeedId, dmfr::Feed> = HashMap::new();
176    let mut operator_hashmap: HashMap<OperatorId, dmfr::Operator> = HashMap::new();
177    let mut operator_to_feed_hashmap: HashMap<OperatorId, Vec<FeedPairInfo>> = HashMap::new();
178    let mut feed_to_operator_pairs_hashmap: HashMap<FeedId, Vec<OperatorPairInfo>> = HashMap::new();
179
180    let mut list_of_bad_files: Vec<String> = vec![];
181
182    for entry in feed_entries {
183        if let Ok(entry) = entry {
184            if let Some(file_name) = entry.file_name().to_str() {
185                //println!("{}", file_name);
186                let contents = fs::read_to_string(format!("{}/feeds/{}", path, file_name));
187                if contents.is_err() {
188                    eprintln!(
189                        "Error Reading Feed File {}: {}",
190                        file_name,
191                        contents.unwrap_err()
192                    );
193                    continue;
194                }
195                let dmfrinfo: Result<dmfr::DistributedMobilityFeedRegistry, SerdeError> =
196                    serde_json::from_str(&contents.unwrap());
197                match dmfrinfo {
198                    Ok(dmfrinfo) => {
199                        for feed in dmfrinfo.feeds.into_iter() {
200                            process_feed(
201                                &feed,
202                                &mut feed_hashmap,
203                                &mut operator_hashmap,
204                                &mut operator_to_feed_hashmap,
205                                &mut feed_to_operator_pairs_hashmap,
206                            );
207                        }
208
209                        for operator in dmfrinfo.operators.into_iter() {
210                            process_operator(
211                                &operator,
212                                &mut feed_hashmap,
213                                &mut operator_hashmap,
214                                &mut operator_to_feed_hashmap,
215                                &mut feed_to_operator_pairs_hashmap,
216                                None,
217                            );
218                        }
219                    }
220                    Err(_) => {
221                        list_of_bad_files.push(file_name.to_string());
222                    }
223                }
224            }
225        }
226    }
227
228    //cross check feed_to_operator_hashmap into feed_to_operator_pairs_hashmap
229
230    Ok(ReturnDmfrAnalysis {
231        feed_hashmap,
232        operator_hashmap,
233        operator_to_feed_hashmap,
234        feed_to_operator_pairs_hashmap,
235        list_of_bad_files: Some(list_of_bad_files),
236    })
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    #[test]
244    fn test() {
245        println!("MAIN TEST");
246        let dmfr_result = read_folders("transitland-atlas/").unwrap();
247
248        assert!(dmfr_result.feed_hashmap.len() > 1000);
249
250        fs::write(
251            "operator_to_feed_hashmap.json",
252            format!("{:#?}", dmfr_result.operator_to_feed_hashmap),
253        )
254        .expect("Unable to write file");
255
256        fs::write(
257            "feed_to_operator_pairs_hashmap.json",
258            format!("{:#?}", dmfr_result.feed_to_operator_pairs_hashmap),
259        )
260        .expect("Unable to write file");
261
262        println!(
263            "{} feeds across {} operators",
264            dmfr_result.feed_hashmap.len(),
265            dmfr_result.operator_hashmap.len()
266        );
267
268        println!(
269            "Operator to feed hashmap length {}",
270            dmfr_result.operator_to_feed_hashmap.len()
271        );
272        println!(
273            "feed_to_operator_pairs_hashmap length {}",
274            dmfr_result.feed_to_operator_pairs_hashmap.len()
275        );
276
277        assert!(dmfr_result
278            .feed_to_operator_pairs_hashmap
279            .get("f-ucla~bruinbus~rt")
280            .is_some());
281        assert!(dmfr_result
282            .feed_to_operator_pairs_hashmap
283            .get("f-spokanetransitauthority~rt")
284            .is_some());
285    }
286}